123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- package services
- import (
- "encoding/csv"
- "fmt"
- "io"
- "log"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "xg_fetl/internal/db_executor"
- "xg_fetl/internal/models"
- )
- // ReaderMain 函数读取 CSV 文件并将数据插入数据库
- func ReaderMain(
- dbName string, // 数据库名称
- dirPath string, // CSV 文件所在的根目录
- tableInfos map[string]models.TableInfo, // 要处理的表信息
- delimiter rune, // CSV 文件的分隔符
- pageSize int, // 分页大小,每次读取的记录数
- mysqeExec db_executor.DBExecutor, // 数据库执行器接口
- ) {
- // 创建日志文件
- logfileLocation := filepath.Join(dirPath, "import.log")
- logFile, err := os.OpenFile(logfileLocation, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
- if err != nil {
- fmt.Printf("无法创建日志文件:%v\n", err)
- return
- }
- defer logFile.Close()
- // 设置日志输出到文件和控制台
- multiWriter := io.MultiWriter(os.Stdout, logFile)
- logger := log.New(multiWriter, "", log.LstdFlags)
- // 创建一个等待组,用于等待所有表的处理完成
- var wg sync.WaitGroup
- // 创建一个通道,用于收集每个表的处理结果
- resultsChan := make(chan TableResult)
- // 遍历每个表,启动协程进行处理
- for tableName, tableInfo := range tableInfos {
- wg.Add(1)
- go func(tableName string, tableInfo models.TableInfo) {
- defer wg.Done()
- // 初始化 TableResult
- result := TableResult{
- TableName: tableName,
- Success: true,
- GeneratedFiles: 0,
- ErrorCount: 0,
- GoroutineCount: 0,
- TotalRows: 0,
- AverageRowSize: 0,
- ExportDuration: 0,
- TableSize: 0,
- Logs: []string{},
- }
- // 记录“开始处理表”日志
- logMessage(&result, fmt.Sprintf("开始处理表:%s\n", tableName), logger)
- // 处理单个表
- result = processTable(dbName, dirPath, tableName, tableInfo, delimiter, pageSize, mysqeExec, &result, logger)
- // 将结果发送到通道
- resultsChan <- result
- // 记录“完成处理表”日志
- logMessage(&result, fmt.Sprintf("完成处理表:%s\n", tableName), logger)
- }(tableName, tableInfo)
- }
- // 启动一个协程,等待所有表处理完毕后关闭结果通道
- go func() {
- wg.Wait()
- close(resultsChan)
- }()
- // 收集所有处理结果到 resultsMap
- resultsMap := make(map[string]TableResult)
- // 从结果通道接收每个表的处理结果
- for result := range resultsChan {
- // 处理结果,例如输出日志或进行其他处理
- logMessage(&result, fmt.Sprintf("表 %s 处理完成,成功:%v,错误数量:%d,总行数:%d\n",
- result.TableName, result.Success, result.ErrorCount, result.TotalRows), logger)
- // 将结果存储到 resultsMap
- resultsMap[result.TableName] = result
- }
- // 调用 WriteImportReportToLogFile 将总览信息和详细日志写入日志文件
- err = WriteImportReportToLogFile("import_report.log", resultsMap, tableInfos)
- if err != nil {
- logger.Printf("写入导入报告失败:%v\n", err)
- } else {
- logger.Println("导入报告已成功写入 import_report.log")
- }
- }
- // 处理单个表的函数
- func processTable(
- dbName string,
- dirPath string,
- tableName string,
- tableInfo models.TableInfo,
- delimiter rune,
- pageSize int,
- mysqeExec db_executor.DBExecutor,
- result *TableResult,
- logger *log.Logger,
- ) TableResult {
- logMessage(result, fmt.Sprintf("开始处理表 %s 的数据\n", tableName), logger)
- // 记录开始时间
- startTime := time.Now()
- // 构建表对应的目录路径
- tableDir := filepath.Join(dirPath, tableName)
- logMessage(result, fmt.Sprintf("表 %s 的目录路径为:%s\n", tableName, tableDir), logger)
- // 获取该表目录下的所有 CSV 文件
- files, err := os.ReadDir(tableDir)
- if err != nil {
- result.Success = false
- result.Error = err
- logMsg := fmt.Sprintf("读取目录失败:%v\n", err)
- logMessage(result, logMsg, logger)
- return *result
- }
- // 用于统计行大小的变量
- var totalRowSize int64 = 0
- // 遍历所有 CSV 文件,启动协程处理
- var fileWg sync.WaitGroup
- // 控制最大并发数
- maxGoroutines := 5
- semaphore := make(chan struct{}, maxGoroutines)
- for _, file := range files {
- if !file.IsDir() && strings.HasPrefix(file.Name(), tableName+"_") && strings.HasSuffix(file.Name(), ".csv") {
- // 满足条件的文件
- filePath := filepath.Join(tableDir, file.Name())
- result.GeneratedFiles++
- fileWg.Add(1)
- result.GoroutineCount++
- logMessage(result, fmt.Sprintf("发现文件:%s,启动协程进行处理\n", filePath), logger)
- go func(filePath string) {
- defer fileWg.Done()
- semaphore <- struct{}{} // 获取信号量
- defer func() { <-semaphore }() // 释放信号量
- logMessage(result, fmt.Sprintf("开始处理文件:%s\n", filePath), logger)
- // 处理单个 CSV 文件
- err := processCSVFile(filePath, tableName, delimiter, pageSize, mysqeExec, result, &totalRowSize, logger)
- if err != nil {
- result.ErrorCount++
- logMsg := fmt.Sprintf("处理文件 %s 失败:%v\n", filePath, err)
- logMessage(result, logMsg, logger)
- } else {
- logMessage(result, fmt.Sprintf("文件 %s 处理成功\n", filePath), logger)
- }
- }(filePath)
- }
- }
- // 等待所有文件处理完毕
- fileWg.Wait()
- // 计算统计信息
- result.ExportDuration = time.Since(startTime)
- if result.TotalRows > 0 {
- result.AverageRowSize = totalRowSize / int64(result.TotalRows)
- }
- logMessage(result, fmt.Sprintf("表 %s 的数据处理完成,耗时:%v\n", tableName, result.ExportDuration), logger)
- return *result
- }
- // 处理单个 CSV 文件的函数
- func processCSVFile(
- filePath string,
- tableName string,
- delimiter rune,
- pageSize int,
- mysqeExec db_executor.DBExecutor,
- result *TableResult,
- totalRowSize *int64,
- logger *log.Logger,
- ) error {
- // 打开 CSV 文件
- file, err := os.Open(filePath)
- if err != nil {
- return fmt.Errorf("打开文件失败:%v", err)
- }
- defer file.Close()
- logMessage(result, fmt.Sprintf("打开文件成功:%s\n", filePath), logger)
- // 获取文件大小,更新表的容量
- fileInfo, err := file.Stat()
- if err == nil {
- result.TableSize += fileInfo.Size()
- logMessage(result, fmt.Sprintf("文件大小:%d 字节,累计表大小:%d 字节\n", fileInfo.Size(), result.TableSize), logger)
- }
- // 创建 CSV Reader
- reader := csv.NewReader(file)
- reader.Comma = delimiter
- // 读取表头
- headers, err := reader.Read()
- if err != nil {
- return fmt.Errorf("读取表头失败:%v", err)
- }
- logMessage(result, fmt.Sprintf("读取表头成功:%v\n", headers), logger)
- // 初始化记录批次
- var recordsBatch [][]string
- for {
- // 读取一行记录
- record, err := reader.Read()
- if err == io.EOF {
- // 文件读取完毕,插入剩余的记录
- if len(recordsBatch) > 0 {
- logMessage(result, fmt.Sprintf("文件读取完毕,插入剩余的 %d 条记录到数据库\n", len(recordsBatch)), logger)
- err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch)
- if err != nil {
- return fmt.Errorf("插入数据库失败:%v", err)
- }
- // 更新总行数
- result.TotalRows += len(recordsBatch)
- // 计算行大小
- for _, rec := range recordsBatch {
- *totalRowSize += int64(len(strings.Join(rec, "")))
- }
- logMessage(result, fmt.Sprintf("成功插入剩余记录,累计总行数:%d\n", result.TotalRows), logger)
- }
- break
- }
- if err != nil {
- return fmt.Errorf("读取记录失败:%v", err)
- }
- // 添加到批次
- recordsBatch = append(recordsBatch, record)
- // 如果达到批次大小,插入数据库
- if len(recordsBatch) >= pageSize {
- logMessage(result, fmt.Sprintf("达到批次大小 %d,插入记录到数据库\n", pageSize), logger)
- err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch)
- if err != nil {
- return fmt.Errorf("插入数据库失败:%v", err)
- }
- // 更新总行数
- result.TotalRows += len(recordsBatch)
- // 计算行大小
- for _, rec := range recordsBatch {
- *totalRowSize += int64(len(strings.Join(rec, "")))
- }
- logMessage(result, fmt.Sprintf("成功插入记录,累计总行数:%d\n", result.TotalRows), logger)
- // 清空批次
- recordsBatch = recordsBatch[:0]
- }
- }
- return nil
- }
|