|
- package services
- import (
- "encoding/csv"
- "fmt"
- "io"
- "log"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "xg_fetl/internal/db_executor"
- "xg_fetl/internal/models"
- )
- func ReaderMain(
- dbName string, // 数据库名称
- dirPath string, // CSV 文件所在的根目录
- tableInfos map[string]models.TableInfo, // 要处理的表信息
- delimiter rune, // CSV 文件的分隔符
- pageSize int, // 分页大小,每次读取的记录数
- mysqeExec db_executor.DBExecutor, // 数据库执行器接口
- ) {
-
- logfileLocation := filepath.Join(dirPath, "import.log")
- logFile, err := os.OpenFile(logfileLocation, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
- if err != nil {
- fmt.Printf("无法创建日志文件:%v\n", err)
- return
- }
- defer logFile.Close()
-
- multiWriter := io.MultiWriter(os.Stdout, logFile)
- logger := log.New(multiWriter, "", log.LstdFlags)
-
- var wg sync.WaitGroup
-
- resultsChan := make(chan TableResult)
-
- for tableName, tableInfo := range tableInfos {
- wg.Add(1)
- go func(tableName string, tableInfo models.TableInfo) {
- defer wg.Done()
-
- result := TableResult{
- TableName: tableName,
- Success: true,
- GeneratedFiles: 0,
- ErrorCount: 0,
- GoroutineCount: 0,
- TotalRows: 0,
- AverageRowSize: 0,
- ExportDuration: 0,
- TableSize: 0,
- Logs: []string{},
- }
-
- logMessage(&result, fmt.Sprintf("开始处理表:%s\n", tableName), logger)
-
- result = processTable(dbName, dirPath, tableName, tableInfo, delimiter, pageSize, mysqeExec, &result, logger)
-
- resultsChan <- result
-
- logMessage(&result, fmt.Sprintf("完成处理表:%s\n", tableName), logger)
- }(tableName, tableInfo)
- }
-
- go func() {
- wg.Wait()
- close(resultsChan)
- }()
-
- resultsMap := make(map[string]TableResult)
-
- for result := range resultsChan {
-
- logMessage(&result, fmt.Sprintf("表 %s 处理完成,成功:%v,错误数量:%d,总行数:%d\n",
- result.TableName, result.Success, result.ErrorCount, result.TotalRows), logger)
-
- resultsMap[result.TableName] = result
- }
-
- err = WriteImportReportToLogFile("import_report.log", resultsMap, tableInfos)
- if err != nil {
- logger.Printf("写入导入报告失败:%v\n", err)
- } else {
- logger.Println("导入报告已成功写入 import_report.log")
- }
- }
- func processTable(
- dbName string,
- dirPath string,
- tableName string,
- tableInfo models.TableInfo,
- delimiter rune,
- pageSize int,
- mysqeExec db_executor.DBExecutor,
- result *TableResult,
- logger *log.Logger,
- ) TableResult {
- logMessage(result, fmt.Sprintf("开始处理表 %s 的数据\n", tableName), logger)
-
- startTime := time.Now()
-
- tableDir := filepath.Join(dirPath, tableName)
- logMessage(result, fmt.Sprintf("表 %s 的目录路径为:%s\n", tableName, tableDir), logger)
-
- files, err := os.ReadDir(tableDir)
- if err != nil {
- result.Success = false
- result.Error = err
- logMsg := fmt.Sprintf("读取目录失败:%v\n", err)
- logMessage(result, logMsg, logger)
- return *result
- }
-
- var totalRowSize int64 = 0
-
- var fileWg sync.WaitGroup
-
- maxGoroutines := 5
- semaphore := make(chan struct{}, maxGoroutines)
- for _, file := range files {
- if !file.IsDir() && strings.HasPrefix(file.Name(), tableName+"_") && strings.HasSuffix(file.Name(), ".csv") {
-
- filePath := filepath.Join(tableDir, file.Name())
- result.GeneratedFiles++
- fileWg.Add(1)
- result.GoroutineCount++
- logMessage(result, fmt.Sprintf("发现文件:%s,启动协程进行处理\n", filePath), logger)
- go func(filePath string) {
- defer fileWg.Done()
- semaphore <- struct{}{}
- defer func() { <-semaphore }()
- logMessage(result, fmt.Sprintf("开始处理文件:%s\n", filePath), logger)
-
- err := processCSVFile(filePath, tableName, delimiter, pageSize, mysqeExec, result, &totalRowSize, logger)
- if err != nil {
- result.ErrorCount++
- logMsg := fmt.Sprintf("处理文件 %s 失败:%v\n", filePath, err)
- logMessage(result, logMsg, logger)
- } else {
- logMessage(result, fmt.Sprintf("文件 %s 处理成功\n", filePath), logger)
- }
- }(filePath)
- }
- }
-
- fileWg.Wait()
-
- result.ExportDuration = time.Since(startTime)
- if result.TotalRows > 0 {
- result.AverageRowSize = totalRowSize / int64(result.TotalRows)
- }
- logMessage(result, fmt.Sprintf("表 %s 的数据处理完成,耗时:%v\n", tableName, result.ExportDuration), logger)
- return *result
- }
- func processCSVFile(
- filePath string,
- tableName string,
- delimiter rune,
- pageSize int,
- mysqeExec db_executor.DBExecutor,
- result *TableResult,
- totalRowSize *int64,
- logger *log.Logger,
- ) error {
-
- file, err := os.Open(filePath)
- if err != nil {
- return fmt.Errorf("打开文件失败:%v", err)
- }
- defer file.Close()
- logMessage(result, fmt.Sprintf("打开文件成功:%s\n", filePath), logger)
-
- fileInfo, err := file.Stat()
- if err == nil {
- result.TableSize += fileInfo.Size()
- logMessage(result, fmt.Sprintf("文件大小:%d 字节,累计表大小:%d 字节\n", fileInfo.Size(), result.TableSize), logger)
- }
-
- reader := csv.NewReader(file)
- reader.Comma = delimiter
-
- headers, err := reader.Read()
- if err != nil {
- return fmt.Errorf("读取表头失败:%v", err)
- }
- logMessage(result, fmt.Sprintf("读取表头成功:%v\n", headers), logger)
-
- var recordsBatch [][]string
- for {
-
- record, err := reader.Read()
- if err == io.EOF {
-
- if len(recordsBatch) > 0 {
- logMessage(result, fmt.Sprintf("文件读取完毕,插入剩余的 %d 条记录到数据库\n", len(recordsBatch)), logger)
- err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch)
- if err != nil {
- return fmt.Errorf("插入数据库失败:%v", err)
- }
-
- result.TotalRows += len(recordsBatch)
-
- for _, rec := range recordsBatch {
- *totalRowSize += int64(len(strings.Join(rec, "")))
- }
- logMessage(result, fmt.Sprintf("成功插入剩余记录,累计总行数:%d\n", result.TotalRows), logger)
- }
- break
- }
- if err != nil {
- return fmt.Errorf("读取记录失败:%v", err)
- }
-
- recordsBatch = append(recordsBatch, record)
-
- if len(recordsBatch) >= pageSize {
- logMessage(result, fmt.Sprintf("达到批次大小 %d,插入记录到数据库\n", pageSize), logger)
- err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch)
- if err != nil {
- return fmt.Errorf("插入数据库失败:%v", err)
- }
-
- result.TotalRows += len(recordsBatch)
-
- for _, rec := range recordsBatch {
- *totalRowSize += int64(len(strings.Join(rec, "")))
- }
- logMessage(result, fmt.Sprintf("成功插入记录,累计总行数:%d\n", result.TotalRows), logger)
-
- recordsBatch = recordsBatch[:0]
- }
- }
- return nil
- }
|