package services import ( "encoding/csv" "fmt" "io" "log" "os" "path/filepath" "strings" "sync" "time" "xg_fetl/internal/db_executor" "xg_fetl/internal/models" ) // ReaderMain 函数读取 CSV 文件并将数据插入数据库 func ReaderMain( dbName string, // 数据库名称 dirPath string, // CSV 文件所在的根目录 tableInfos map[string]models.TableInfo, // 要处理的表信息 delimiter rune, // CSV 文件的分隔符 pageSize int, // 分页大小,每次读取的记录数 mysqeExec db_executor.DBExecutor, // 数据库执行器接口 ) { // 创建日志文件 logfileLocation := filepath.Join(dirPath, "import.log") logFile, err := os.OpenFile(logfileLocation, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { fmt.Printf("无法创建日志文件:%v\n", err) return } defer logFile.Close() // 设置日志输出到文件和控制台 multiWriter := io.MultiWriter(os.Stdout, logFile) logger := log.New(multiWriter, "", log.LstdFlags) // 创建一个等待组,用于等待所有表的处理完成 var wg sync.WaitGroup // 创建一个通道,用于收集每个表的处理结果 resultsChan := make(chan TableResult) // 遍历每个表,启动协程进行处理 for tableName, tableInfo := range tableInfos { wg.Add(1) go func(tableName string, tableInfo models.TableInfo) { defer wg.Done() // 初始化 TableResult result := TableResult{ TableName: tableName, Success: true, GeneratedFiles: 0, ErrorCount: 0, GoroutineCount: 0, TotalRows: 0, AverageRowSize: 0, ExportDuration: 0, TableSize: 0, Logs: []string{}, } // 记录“开始处理表”日志 logMessage(&result, fmt.Sprintf("开始处理表:%s\n", tableName), logger) // 处理单个表 result = processTable(dbName, dirPath, tableName, tableInfo, delimiter, pageSize, mysqeExec, &result, logger) // 将结果发送到通道 resultsChan <- result // 记录“完成处理表”日志 logMessage(&result, fmt.Sprintf("完成处理表:%s\n", tableName), logger) }(tableName, tableInfo) } // 启动一个协程,等待所有表处理完毕后关闭结果通道 go func() { wg.Wait() close(resultsChan) }() // 收集所有处理结果到 resultsMap resultsMap := make(map[string]TableResult) // 从结果通道接收每个表的处理结果 for result := range resultsChan { // 处理结果,例如输出日志或进行其他处理 logMessage(&result, fmt.Sprintf("表 %s 处理完成,成功:%v,错误数量:%d,总行数:%d\n", result.TableName, result.Success, result.ErrorCount, result.TotalRows), logger) // 将结果存储到 resultsMap resultsMap[result.TableName] = result } // 调用 WriteImportReportToLogFile 将总览信息和详细日志写入日志文件 err = WriteImportReportToLogFile("import_report.log", resultsMap, tableInfos) if err != nil { logger.Printf("写入导入报告失败:%v\n", err) } else { logger.Println("导入报告已成功写入 import_report.log") } } // 处理单个表的函数 func processTable( dbName string, dirPath string, tableName string, tableInfo models.TableInfo, delimiter rune, pageSize int, mysqeExec db_executor.DBExecutor, result *TableResult, logger *log.Logger, ) TableResult { logMessage(result, fmt.Sprintf("开始处理表 %s 的数据\n", tableName), logger) // 记录开始时间 startTime := time.Now() // 构建表对应的目录路径 tableDir := filepath.Join(dirPath, tableName) logMessage(result, fmt.Sprintf("表 %s 的目录路径为:%s\n", tableName, tableDir), logger) // 获取该表目录下的所有 CSV 文件 files, err := os.ReadDir(tableDir) if err != nil { result.Success = false result.Error = err logMsg := fmt.Sprintf("读取目录失败:%v\n", err) logMessage(result, logMsg, logger) return *result } // 用于统计行大小的变量 var totalRowSize int64 = 0 // 遍历所有 CSV 文件,启动协程处理 var fileWg sync.WaitGroup // 控制最大并发数 maxGoroutines := 5 semaphore := make(chan struct{}, maxGoroutines) for _, file := range files { if !file.IsDir() && strings.HasPrefix(file.Name(), tableName+"_") && strings.HasSuffix(file.Name(), ".csv") { // 满足条件的文件 filePath := filepath.Join(tableDir, file.Name()) result.GeneratedFiles++ fileWg.Add(1) result.GoroutineCount++ logMessage(result, fmt.Sprintf("发现文件:%s,启动协程进行处理\n", filePath), logger) go func(filePath string) { defer fileWg.Done() semaphore <- struct{}{} // 获取信号量 defer func() { <-semaphore }() // 释放信号量 logMessage(result, fmt.Sprintf("开始处理文件:%s\n", filePath), logger) // 处理单个 CSV 文件 err := processCSVFile(filePath, tableName, delimiter, pageSize, mysqeExec, result, &totalRowSize, logger) if err != nil { result.ErrorCount++ logMsg := fmt.Sprintf("处理文件 %s 失败:%v\n", filePath, err) logMessage(result, logMsg, logger) } else { logMessage(result, fmt.Sprintf("文件 %s 处理成功\n", filePath), logger) } }(filePath) } } // 等待所有文件处理完毕 fileWg.Wait() // 计算统计信息 result.ExportDuration = time.Since(startTime) if result.TotalRows > 0 { result.AverageRowSize = totalRowSize / int64(result.TotalRows) } logMessage(result, fmt.Sprintf("表 %s 的数据处理完成,耗时:%v\n", tableName, result.ExportDuration), logger) return *result } // 处理单个 CSV 文件的函数 func processCSVFile( filePath string, tableName string, delimiter rune, pageSize int, mysqeExec db_executor.DBExecutor, result *TableResult, totalRowSize *int64, logger *log.Logger, ) error { // 打开 CSV 文件 file, err := os.Open(filePath) if err != nil { return fmt.Errorf("打开文件失败:%v", err) } defer file.Close() logMessage(result, fmt.Sprintf("打开文件成功:%s\n", filePath), logger) // 获取文件大小,更新表的容量 fileInfo, err := file.Stat() if err == nil { result.TableSize += fileInfo.Size() logMessage(result, fmt.Sprintf("文件大小:%d 字节,累计表大小:%d 字节\n", fileInfo.Size(), result.TableSize), logger) } // 创建 CSV Reader reader := csv.NewReader(file) reader.Comma = delimiter // 读取表头 headers, err := reader.Read() if err != nil { return fmt.Errorf("读取表头失败:%v", err) } logMessage(result, fmt.Sprintf("读取表头成功:%v\n", headers), logger) // 初始化记录批次 var recordsBatch [][]string for { // 读取一行记录 record, err := reader.Read() if err == io.EOF { // 文件读取完毕,插入剩余的记录 if len(recordsBatch) > 0 { logMessage(result, fmt.Sprintf("文件读取完毕,插入剩余的 %d 条记录到数据库\n", len(recordsBatch)), logger) err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch) if err != nil { return fmt.Errorf("插入数据库失败:%v", err) } // 更新总行数 result.TotalRows += len(recordsBatch) // 计算行大小 for _, rec := range recordsBatch { *totalRowSize += int64(len(strings.Join(rec, ""))) } logMessage(result, fmt.Sprintf("成功插入剩余记录,累计总行数:%d\n", result.TotalRows), logger) } break } if err != nil { return fmt.Errorf("读取记录失败:%v", err) } // 添加到批次 recordsBatch = append(recordsBatch, record) // 如果达到批次大小,插入数据库 if len(recordsBatch) >= pageSize { logMessage(result, fmt.Sprintf("达到批次大小 %d,插入记录到数据库\n", pageSize), logger) err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch) if err != nil { return fmt.Errorf("插入数据库失败:%v", err) } // 更新总行数 result.TotalRows += len(recordsBatch) // 计算行大小 for _, rec := range recordsBatch { *totalRowSize += int64(len(strings.Join(rec, ""))) } logMessage(result, fmt.Sprintf("成功插入记录,累计总行数:%d\n", result.TotalRows), logger) // 清空批次 recordsBatch = recordsBatch[:0] } } return nil }