GTong 8 hónapja
szülő
commit
90c5ec36d6

+ 0 - 0
.gitingore → .gitignore


+ 25 - 9
cmd/export.go

@@ -3,6 +3,7 @@ package cmd
 import (
 	"fmt"
 	"os"
+	"strings"
 	"xg_fetl/internal/controllers"
 
 	"github.com/spf13/cobra"
@@ -14,27 +15,42 @@ var exportCmd = &cobra.Command{
 	Short: "导出数据模式",
 	Run: func(cmd *cobra.Command, args []string) {
 		// 定义参数变量
-		outputDir, _ := cmd.Flags().GetString("output")
-		tableName, _ := cmd.Flags().GetString("table")
 
+		whitelistTable, _ := cmd.Flags().GetString("whitelist table")
+		blacklistTable, _ := cmd.Flags().GetString("blacklist table")
 		// 检查参数
-		if tableName == "" {
-			fmt.Println("错误:必须指定表对象名 (-t)")
+		// if dbName == "" {
+		// 	fmt.Println("错误:必须指定数据库名 (-t)")
+		// 	cmd.Usage()
+		// 	os.Exit(1)
+		// }
+		// 验证两个参数不能同时存在
+		if whitelistTable != "" && blacklistTable != "" {
+			fmt.Println("错误:`whitelist-table` 和 `blacklist-table` 不能同时指定。请选择其中一个。")
 			cmd.Usage()
 			os.Exit(1)
 		}
 
 		// 打印接收到的参数
 		fmt.Printf("执行模式: 导出\n")
-		fmt.Printf("输出目录: %s\n", outputDir)
-		fmt.Printf("表对象名: %s\n", tableName)
+		fmt.Printf("白名单 表名: %s\n", whitelistTable)
+		fmt.Printf("黑名单 表名: %s\n", blacklistTable)
+		if whitelistTable != "" {
+			parts := strings.Split(whitelistTable, ",")
+			controllers.ExportController(parts)
+		}
+		if blacklistTable != "" {
+			parts := strings.Split(blacklistTable, ",")
+			controllers.ExportController(parts)
+		}
 
-		controllers.ExportController(outputDir)
 	},
 }
 
 // 初始化导出子命令的标志
 func init() {
-	exportCmd.Flags().StringP("output", "o", ".", "输出目录 (默认是当前目录)")
-	exportCmd.Flags().StringP("table", "t", "", "表对象名,格式为 模式名.表名 (例如: test.t1)")
+
+	exportCmd.Flags().StringP("input", "l", "", "输入目录路径 (必选)")
+	exportCmd.Flags().StringP("whitelist table", "w", "", "指定导入的表 (可选)\"表名,表名,表名\"")
+	exportCmd.Flags().StringP("blacklist table", "b", "", "指定不导入的表 (可选)\"表名,表名,表名\"")
 }

+ 18 - 11
cmd/import.go

@@ -3,7 +3,6 @@ package cmd
 import (
 	"fmt"
 	"os"
-	"xg_fetl/internal/controllers"
 
 	"github.com/spf13/cobra"
 )
@@ -15,31 +14,39 @@ var importCmd = &cobra.Command{
 	Run: func(cmd *cobra.Command, args []string) {
 		// 定义参数变量
 		inputPath, _ := cmd.Flags().GetString("input")
-		tableName, _ := cmd.Flags().GetString("table")
-
+		whitelistTable, _ := cmd.Flags().GetString("whitelist table")
+		blacklistTable, _ := cmd.Flags().GetString("blacklist table")
 		// 检查参数
 		if inputPath == "" {
-			fmt.Println("错误:必须指定文件名或文件目录 (-i)")
+			fmt.Println("错误:必须指定文件目录 (-l)")
 			cmd.Usage()
 			os.Exit(1)
 		}
-
-		if tableName == "" {
-			fmt.Println("错误:必须指定表名 (-t)")
+		// 验证两个参数不能同时存在
+		if whitelistTable != "" && blacklistTable != "" {
+			fmt.Println("错误:`whitelist-table` 和 `blacklist-table` 不能同时指定。请选择其中一个。")
 			cmd.Usage()
 			os.Exit(1)
 		}
 
+		// if tableName == "" {
+		// 	fmt.Println("错误:必须指定表名 (-t)")
+		// 	cmd.Usage()
+		// 	os.Exit(1)
+		// }
+
 		// 打印接收到的参数
 		fmt.Printf("执行模式: 导入\n")
 		fmt.Printf("输入路径: %s\n", inputPath)
-		fmt.Printf("表名: %s\n", tableName)
-		controllers.ImportController(inputPath)
+		fmt.Printf("白名单 表名: %s\n", whitelistTable)
+		fmt.Printf("黑名单 表名: %s\n", blacklistTable)
+		//	controllers.ImportController(inputPath)
 	},
 }
 
 // 初始化导入子命令的标志
 func init() {
-	importCmd.Flags().StringP("input", "i", "", "输入文件或目录路径")
-	importCmd.Flags().StringP("table", "t", "", "目标表名 (例如: t1)")
+	importCmd.Flags().StringP("input", "l", "", "输入目录路径 (必选)")
+	importCmd.Flags().StringP("whitelist table", "w", "", "指定导入的表 (可选)\"表名,表名,表名\"")
+	importCmd.Flags().StringP("blacklist table", "b", "", "指定不导入的表 (可选)\"表名,表名,表名\"")
 }

+ 16 - 11
config.toml

@@ -1,24 +1,29 @@
 [import]
 db_type = "mysql"
+ip = "127.0.0.1"
+port = 3357
 dbname = "test1"
 user = "root"
 password = "123456"
-ip = "127.0.0.1"
-port = 3306
-dir_path = "output"
-delimiter = ","
-page_size = 10000
-filename_pattern = "^(.*)_[0-9]+\\.csv$"
+
 
 [export]
 db_type = "mysql"
+ip = "127.0.0.1"
+port = 3357
 dbname = "gtong"
 user = "root"
 password = "123456"
-ip = "127.0.0.1"
-port = 3306
-table_name = "users"
-base_file_path = "./output/"
+
+
+[import.cfg]
 delimiter = ","
-max_file_size = 1024
 page_size = 10000
+filename_pattern = "^(.*)_[0-9]+\\.csv$"
+
+
+[export.cfg]
+base_file_path = "./GTool/"
+delimiter = ","
+max_file_size = 1024
+page_size = 10000

+ 73 - 0
import_report.log

@@ -0,0 +1,73 @@
+========== 导入结果总览 ==========
+成功导入的表(3 个):
+- messages
+  总行数:10000
+  平均行大小:4.97 KB
+  表的容量:73.04 MB
+  估算导入耗时:0.00 秒
+  实际导入耗时:15.31 秒
+  使用协程数:1
+- users
+  总行数:1000
+  平均行大小:0.06 KB
+  表的容量:0.07 MB
+  估算导入耗时:0.00 秒
+  实际导入耗时:2.00 秒
+  使用协程数:1
+- products
+  总行数:1000
+  平均行大小:4.87 KB
+  表的容量:4.77 MB
+  估算导入耗时:0.00 秒
+  实际导入耗时:2.05 秒
+  使用协程数:1
+
+所有表均导入成功。
+
+========== 详细日志信息 ==========
+------------ 表 messages ------------
+开始处理表:messages
+开始处理表 messages 的数据
+表 messages 的目录路径为:GTool\gtong_127.0.0.1\messages
+发现文件:GTool\gtong_127.0.0.1\messages\messages_1.csv,启动协程进行处理
+开始处理文件:GTool\gtong_127.0.0.1\messages\messages_1.csv
+打开文件成功:GTool\gtong_127.0.0.1\messages\messages_1.csv
+文件大小:76593152 字节,累计表大小:76593152 字节
+读取表头成功:[sent_at status message_id sender_id receiver_id content]
+达到批次大小 10000,插入记录到数据库
+成功插入记录,累计总行数:10000
+文件读取完毕,插入剩余的 5000 条记录到数据库
+处理文件 GTool\gtong_127.0.0.1\messages\messages_1.csv 失败:插入数据库失败:failed to execute insert statement: Error 1292 (22007): Incorrect datetime value: '836' for column 'sent_at' at row 1
+表 messages 的数据处理完成,耗时:15.3137826s
+表 messages 处理完成,成功:true,错误数量:1,总行数:10000
+
+------------ 表 products ------------
+开始处理表:products
+开始处理表 products 的数据
+表 products 的目录路径为:GTool\gtong_127.0.0.1\products
+发现文件:GTool\gtong_127.0.0.1\products\products_1.csv,启动协程进行处理
+开始处理文件:GTool\gtong_127.0.0.1\products\products_1.csv
+打开文件成功:GTool\gtong_127.0.0.1\products\products_1.csv
+文件大小:4998278 字节,累计表大小:4998278 字节
+读取表头成功:[product_id product_name description price quantity added_date]
+文件读取完毕,插入剩余的 1000 条记录到数据库
+成功插入剩余记录,累计总行数:1000
+文件 GTool\gtong_127.0.0.1\products\products_1.csv 处理成功
+表 products 的数据处理完成,耗时:2.0516516s
+表 products 处理完成,成功:true,错误数量:0,总行数:1000
+
+------------ 表 users ------------
+开始处理表:users
+开始处理表 users 的数据
+表 users 的目录路径为:GTool\gtong_127.0.0.1\users
+发现文件:GTool\gtong_127.0.0.1\users\users_1.csv,启动协程进行处理
+开始处理文件:GTool\gtong_127.0.0.1\users\users_1.csv
+打开文件成功:GTool\gtong_127.0.0.1\users\users_1.csv
+文件大小:72247 字节,累计表大小:72247 字节
+读取表头成功:[password created_at is_active user_id username email]
+文件读取完毕,插入剩余的 1000 条记录到数据库
+成功插入剩余记录,累计总行数:1000
+文件 GTool\gtong_127.0.0.1\users\users_1.csv 处理成功
+表 users 的数据处理完成,耗时:1.9984007s
+完成处理表:users
+

+ 8 - 11
internal/config/config.go

@@ -21,7 +21,6 @@ ImportConfig 导入配置结构体
 */
 type ImportConfig struct {
 	DBConfig        DBConfig       // 数据库配置
-	DirPath         string         // 导入文件的目录路径
 	Delimiter       rune           // 导入文件的分隔符
 	PageSize        int            // 分页读取的大小
 	FilenamePattern *regexp.Regexp // 文件名匹配的正则表达式
@@ -33,7 +32,6 @@ ExportConfig 导出配置结构体
 */
 type ExportConfig struct {
 	DBConfig     DBConfig // 数据库配置
-	TableName    string   // 要导出的表名
 	BaseFilePath string   // 导出文件的基本路径
 	Delimiter    rune     // CSV 的分隔符
 	MaxFileSize  int64    // 单个文件的最大大小 (字节)
@@ -63,7 +61,7 @@ func LoadConfig() (*Config, error) {
 	}
 
 	// 解析 Import 配置
-	importPattern, err := regexp.Compile(viper.GetString("import.filename_pattern"))
+	importPattern, err := regexp.Compile(viper.GetString("import.cfg.filename_pattern"))
 	if err != nil {
 		return nil, fmt.Errorf("error compiling filename pattern: %v", err)
 	}
@@ -76,9 +74,9 @@ func LoadConfig() (*Config, error) {
 			IP:       viper.GetString("import.ip"),
 			Port:     viper.GetInt("import.port"),
 		},
-		DirPath:         viper.GetString("import.dir_path"),
-		Delimiter:       rune(viper.GetString("import.delimiter")[0]),
-		PageSize:        viper.GetInt("import.page_size"),
+
+		Delimiter:       rune(viper.GetString("import.cfg.delimiter")[0]),
+		PageSize:        viper.GetInt("import.cfg.page_size"),
 		FilenamePattern: importPattern,
 	}
 
@@ -92,11 +90,10 @@ func LoadConfig() (*Config, error) {
 			IP:       viper.GetString("export.ip"),
 			Port:     viper.GetInt("export.port"),
 		},
-		TableName:    viper.GetString("export.table_name"),
-		BaseFilePath: viper.GetString("export.base_file_path"),
-		Delimiter:    rune(viper.GetString("export.delimiter")[0]),
-		MaxFileSize:  viper.GetInt64("export.max_file_size"),
-		PageSize:     viper.GetInt("export.page_size"),
+		BaseFilePath: viper.GetString("export.cfg.base_file_path"),
+		Delimiter:    rune(viper.GetString("export.cfg.delimiter")[0]),
+		MaxFileSize:  viper.GetInt64("export.cfg.max_file_size"),
+		PageSize:     viper.GetInt("export.cfg.page_size"),
 	}
 
 	return &Config{

+ 23 - 13
internal/controllers/export_controller.go

@@ -10,7 +10,7 @@ import (
 	"xg_fetl/internal/utils"
 )
 
-func ExportController(outputDir string) {
+func ExportController(tableName []string) {
 
 	dbType := global.Cfg.ExportConfig.DBConfig.DBType
 	dsn := global.Cfg.ExportConfig.DBConfig.GetDSN()
@@ -37,7 +37,12 @@ func ExportController(outputDir string) {
 	}
 
 	// !创建存放路径
-	outputDir = "./GTool" + "/" + dbName + "_" + dbIp
+	var outputDir string
+	if global.Cfg.ExportConfig.BaseFilePath == "" {
+		outputDir = global.Cfg.ExportConfig.BaseFilePath + "/" + dbName + "_" + dbIp
+	} else {
+		outputDir = "./GTool" + "/" + dbName + "_" + dbIp
+	}
 	if err := utils.CreateDirectory(outputDir); err != nil {
 		fmt.Println(fmt.Errorf("创建文件夹失败,可能已存在同名文件夹 %s, err: %v", outputDir, err))
 		return
@@ -49,16 +54,6 @@ func ExportController(outputDir string) {
 	WriteDatabaseInfoToFile(dbInfo, dbInfoFile)
 
 	// !导出
-	// // 创建一个空的 tableNames 数组,用于存放表名
-	// tableNames := make([]string, 0, len(dbInfo.Tables)+len(dbInfo.LobTables))
-	// // 将 tableInfoMap 中的表名添加到 tableNames 中
-	// for tableName := range dbInfo.Tables {
-	// 	tableNames = append(tableNames, tableName)
-	// }
-	// // 将 lobTableInfoMap 中的表名添加到 tableNames 中
-	// for tableName := range dbInfo.LobTables {
-	// 	tableNames = append(tableNames, tableName)
-	// }
 	// 创建一个新的 map,将普通表和 LOB 表的信息都放进去
 	allTables := make(map[string]models.TableInfo, len(dbInfo.Tables)+len(dbInfo.LobTables))
 
@@ -71,7 +66,22 @@ func ExportController(outputDir string) {
 	for tableName, tableInfo := range dbInfo.LobTables {
 		allTables[tableName] = tableInfo
 	}
-	services.WriteMain(executor, allTables, outputDir, delimiter, maxFileSize, pageSize)
+
+	if len(tableName) != 0 {
+		// 创建一个新的map来存储根据tableName筛选后的表信息
+		selectedTables := make(map[string]models.TableInfo)
+
+		for _, name := range tableName {
+			if tableInfo, ok := allTables[name]; ok {
+				selectedTables[name] = tableInfo
+			}
+		}
+		services.WriteMain(executor, selectedTables, outputDir, delimiter, maxFileSize, pageSize)
+
+	} else {
+		services.WriteMain(executor, allTables, outputDir, delimiter, maxFileSize, pageSize)
+	}
+
 }
 
 // 打印 DatabaseInfo 的函数

+ 154 - 0
internal/services/logs.go

@@ -0,0 +1,154 @@
+package services
+
+import (
+	"bytes"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"xg_fetl/internal/models"
+)
+
+// 将总览信息和按表分类的详细日志写入日志文件
+func WriteExportReportToLogFile(logFilePath string, resultsMap map[string]TableResult, tableInfos map[string]models.TableInfo) error {
+	var logBuffer bytes.Buffer
+
+	// 构建总览信息
+	logBuffer.WriteString("========== 导出结果总览 ==========\n")
+	var successTables []string
+	var failedTables []TableResult
+	for _, result := range resultsMap {
+		if result.Success {
+			successTables = append(successTables, result.TableName)
+		} else {
+			failedTables = append(failedTables, result)
+		}
+	}
+	logBuffer.WriteString(fmt.Sprintf("成功导出的表(%d 个):\n", len(successTables)))
+	for _, tableName := range successTables {
+		result := resultsMap[tableName]
+		avgRowSizeKB := float64(result.AverageRowSize) / 1024.0
+		tableSizeMB := float64(result.TableSize) / (1024.0 * 1024.0)
+		exportTime := result.ExportDuration.Seconds()
+		estimatedTime := result.EstimatedDuration.Seconds()
+		logBuffer.WriteString(fmt.Sprintf("- %s\n", tableName))
+		logBuffer.WriteString(fmt.Sprintf("  总行数:%d\n", result.TotalRows))
+		logBuffer.WriteString(fmt.Sprintf("  平均行大小:%.2f KB\n", avgRowSizeKB))
+		logBuffer.WriteString(fmt.Sprintf("  表的容量:%.2f MB\n", tableSizeMB))
+		logBuffer.WriteString(fmt.Sprintf("  估算导出耗时:%.2f 秒\n", estimatedTime))
+		logBuffer.WriteString(fmt.Sprintf("  实际导出耗时:%.2f 秒\n", exportTime))
+		logBuffer.WriteString(fmt.Sprintf("  总错误次数:%d\n", result.ErrorCount))
+		logBuffer.WriteString(fmt.Sprintf("  使用协程数:%d\n", result.GoroutineCount))
+	}
+	logBuffer.WriteString("\n")
+	if len(failedTables) > 0 {
+		logBuffer.WriteString(fmt.Sprintf("导出失败的表(%d 个):\n", len(failedTables)))
+		for _, failedTable := range failedTables {
+			logBuffer.WriteString(fmt.Sprintf("- %s\n", failedTable.TableName))
+			logBuffer.WriteString(fmt.Sprintf("  错误次数:%d\n", failedTable.ErrorCount))
+			logBuffer.WriteString(fmt.Sprintf("  使用协程数:%d\n", failedTable.GoroutineCount))
+			logBuffer.WriteString(fmt.Sprintf("  错误信息:%v\n", failedTable.Error))
+		}
+	} else {
+		logBuffer.WriteString("所有表均导出成功。\n")
+	}
+	logBuffer.WriteString("\n")
+	logBuffer.WriteString("========== 详细日志信息 ==========\n")
+
+	// 按表名排序,输出每个表的详细日志信息
+	for _, tableName := range sortedTableNames(tableInfos) {
+		logBuffer.WriteString(fmt.Sprintf("------------ 表 %s ------------\n", tableName))
+		result, exists := resultsMap[tableName]
+		if exists {
+			for _, msg := range result.Logs {
+				logBuffer.WriteString(msg)
+			}
+			logBuffer.WriteString("\n")
+		} else {
+			logBuffer.WriteString(fmt.Sprintf("表 %s 的处理结果不存在。\n", tableName))
+		}
+	}
+
+	// 将新的日志内容写回日志文件
+	err := ioutil.WriteFile(logFilePath, logBuffer.Bytes(), 0644)
+	if err != nil {
+		return fmt.Errorf("无法写入日志文件: %v", err)
+	}
+
+	return nil
+}
+
+// WriteImportReportToLogFile 函数将总览信息和按表分类的详细日志写入日志文件
+func WriteImportReportToLogFile(logFilePath string, resultsMap map[string]TableResult, tableInfos map[string]models.TableInfo) error {
+	var logBuffer bytes.Buffer
+
+	// 构建总览信息
+	logBuffer.WriteString("========== 导入结果总览 ==========\n")
+	var successTables []string
+	var failedTables []TableResult
+	for _, result := range resultsMap {
+		if result.Success {
+			successTables = append(successTables, result.TableName)
+		} else {
+			failedTables = append(failedTables, result)
+		}
+	}
+	logBuffer.WriteString(fmt.Sprintf("成功导入的表(%d 个):\n", len(successTables)))
+	for _, tableName := range successTables {
+		result := resultsMap[tableName]
+		avgRowSizeKB := float64(result.AverageRowSize) / 1024.0
+		tableSizeMB := float64(result.TableSize) / (1024.0 * 1024.0)
+		exportTime := result.ExportDuration.Seconds()
+		estimatedTime := result.EstimatedDuration.Seconds()
+		logBuffer.WriteString(fmt.Sprintf("- %s\n", tableName))
+		logBuffer.WriteString(fmt.Sprintf("  总行数:%d\n", result.TotalRows))
+		logBuffer.WriteString(fmt.Sprintf("  平均行大小:%.2f KB\n", avgRowSizeKB))
+		logBuffer.WriteString(fmt.Sprintf("  表的容量:%.2f MB\n", tableSizeMB))
+		logBuffer.WriteString(fmt.Sprintf("  估算导入耗时:%.2f 秒\n", estimatedTime))
+		logBuffer.WriteString(fmt.Sprintf("  实际导入耗时:%.2f 秒\n", exportTime))
+		logBuffer.WriteString(fmt.Sprintf("  总错误次数:%d\n", result.ErrorCount))
+		logBuffer.WriteString(fmt.Sprintf("  使用协程数:%d\n", result.GoroutineCount))
+	}
+	logBuffer.WriteString("\n")
+	if len(failedTables) > 0 {
+		logBuffer.WriteString(fmt.Sprintf("导入失败的表(%d 个):\n", len(failedTables)))
+		for _, failedTable := range failedTables {
+			logBuffer.WriteString(fmt.Sprintf("- %s\n", failedTable.TableName))
+			logBuffer.WriteString(fmt.Sprintf("  错误次数:%d\n", failedTable.ErrorCount))
+			logBuffer.WriteString(fmt.Sprintf("  使用协程数:%d\n", failedTable.GoroutineCount))
+			logBuffer.WriteString(fmt.Sprintf("  错误信息:%v\n", failedTable.Error))
+		}
+	} else {
+		logBuffer.WriteString("所有表均导入成功。\n")
+	}
+	logBuffer.WriteString("\n")
+	logBuffer.WriteString("========== 详细日志信息 ==========\n")
+
+	// 按表名排序,输出每个表的详细日志信息
+	for _, tableName := range sortedTableNames(tableInfos) {
+		logBuffer.WriteString(fmt.Sprintf("------------ 表 %s ------------\n", tableName))
+		result, exists := resultsMap[tableName]
+		if exists {
+			for _, msg := range result.Logs {
+				logBuffer.WriteString(msg)
+			}
+			logBuffer.WriteString("\n")
+		} else {
+			logBuffer.WriteString(fmt.Sprintf("表 %s 的处理结果不存在。\n", tableName))
+		}
+	}
+
+	// 将新的日志内容写回日志文件
+	err := ioutil.WriteFile(logFilePath, logBuffer.Bytes(), 0644)
+	if err != nil {
+		return fmt.Errorf("无法写入日志文件: %v", err)
+	}
+
+	return nil
+}
+
+// 日志消息记录函数
+func logMessage(result *TableResult, msg string, logger *log.Logger) {
+	fmt.Print(msg)
+	logger.Print(msg)
+	result.Logs = append(result.Logs, msg)
+}

+ 209 - 212
internal/services/reader_services.go

@@ -3,19 +3,20 @@ package services
 import (
 	"encoding/csv"
 	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
 	"os"
 	"path/filepath"
-	"runtime"
-	"sort"
+	"strings"
 	"sync"
 	"time"
 
-	"regexp"
 	"xg_fetl/internal/db_executor"
 	"xg_fetl/internal/models"
 )
 
-// ReaderMain 主函数,参数化配置,使其更加灵活
+// ReaderMain 函数读取 CSV 文件并将数据插入数据库
 func ReaderMain(
 	dbName string, // 数据库名称
 	dirPath string, // CSV 文件所在的根目录
@@ -24,262 +25,258 @@ func ReaderMain(
 	pageSize int, // 分页大小,每次读取的记录数
 	mysqeExec db_executor.DBExecutor, // 数据库执行器接口
 ) {
-	// 创建用于接收每个表处理结果的通道
-	resultChan := make(chan TableResult)
-	// 创建等待组,用于等待所有表的处理完成
+	// 创建日志文件
+	logFile, err := os.OpenFile("process.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+	if err != nil {
+		fmt.Printf("无法创建日志文件:%v\n", err)
+		return
+	}
+	defer logFile.Close()
+
+	// 设置日志输出到文件和控制台
+	multiWriter := io.MultiWriter(os.Stdout, logFile)
+	logger := log.New(multiWriter, "", log.LstdFlags)
+
+	// 创建一个等待组,用于等待所有表的处理完成
 	var wg sync.WaitGroup
 
-	// 遍历每个表进行处理,使用协程
-	for _, tableName := range sortedTableNames(tableInfos) {
-		wg.Add(1)
+	// 创建一个通道,用于收集每个表的处理结果
+	resultsChan := make(chan TableResult)
 
-		go func(tableName string) {
+	// 遍历每个表,启动协程进行处理
+	for tableName, tableInfo := range tableInfos {
+		wg.Add(1)
+		go func(tableName string, tableInfo models.TableInfo) {
 			defer wg.Done()
 
-			// 开始处理表,记录开始时间
-			startTime := time.Now()
-
 			// 初始化 TableResult
-			tableResult := TableResult{
+			result := TableResult{
 				TableName:      tableName,
-				Success:        false,
-				GoroutineCount: 1, // 只使用一个协程
+				Success:        true,
+				GeneratedFiles: 0,
+				ErrorCount:     0,
+				GoroutineCount: 0,
+				TotalRows:      0,
+				AverageRowSize: 0,
+				ExportDuration: 0,
+				TableSize:      0,
 				Logs:           []string{},
 			}
 
-			// 构建表名对应的文件夹路径,例如 dirPath/tableName
-			tableDirPath := filepath.Join(dirPath, tableName)
-
-			// 检查表名对应的文件夹是否存在
-			if _, err := os.Stat(tableDirPath); os.IsNotExist(err) {
-				errMsg := fmt.Sprintf("表 %s 对应的目录不存在: %v", tableName, err)
-				tableResult.Error = fmt.Errorf(errMsg)
-				tableResult.Logs = append(tableResult.Logs, errMsg)
-				resultChan <- tableResult
-				return
-			}
-
-			// 编译匹配 CSV 文件名的正则表达式,格式为 表名_[序号].csv
-			filenamePattern := fmt.Sprintf(`^%s_\d+\.csv$`, regexp.QuoteMeta(tableName))
-			re, err := regexp.Compile(filenamePattern)
-			if err != nil {
-				errMsg := fmt.Sprintf("无法编译文件名正则表达式: %v", err)
-				tableResult.Error = fmt.Errorf(errMsg)
-				tableResult.Logs = append(tableResult.Logs, errMsg)
-				resultChan <- tableResult
-				return
-			}
-
-			// 获取表目录下的所有 CSV 文件路径
-			files, err := GetCSVFiles(tableDirPath)
-			if err != nil {
-				errMsg := fmt.Sprintf("无法获取 CSV 文件: %v", err)
-				tableResult.Error = fmt.Errorf(errMsg)
-				tableResult.Logs = append(tableResult.Logs, errMsg)
-				resultChan <- tableResult
-				return
-			}
-
-			// 过滤出符合命名规则的文件列表,并按文件名排序
-			var matchedFiles []string
-			for _, filePath := range files {
-				filename := filepath.Base(filePath)
-				if re.MatchString(filename) {
-					matchedFiles = append(matchedFiles, filePath)
-				}
-			}
-
-			if len(matchedFiles) == 0 {
-				errMsg := fmt.Sprintf("表 %s 没有找到符合条件的 CSV 文件", tableName)
-				tableResult.Error = fmt.Errorf(errMsg)
-				tableResult.Logs = append(tableResult.Logs, errMsg)
-				resultChan <- tableResult
-				return
-			}
-
-			// 按文件名排序,确保顺序一致
-			sort.Strings(matchedFiles)
+			// 记录“开始处理表”日志
+			logMessage(&result, fmt.Sprintf("开始处理表:%s\n", tableName), logger)
 
-			// 估算可用内存,决定一次加载多少个文件
-			availableMemory := getAvailableMemory()
-			var batchFiles []string
-			var totalSize int64 = 0
-			fmt.Println("可用内存:", availableMemory)
-			for idx, filePath := range matchedFiles {
-				fileInfo, err := os.Stat(filePath)
-				if err != nil {
-					tableResult.ErrorCount++
-					logMsg := fmt.Sprintf("无法获取文件信息 %s: %v", filePath, err)
-					tableResult.Logs = append(tableResult.Logs, logMsg)
-					continue
-				}
-				fileSize := fileInfo.Size()
-
-				// 判断是否超过可用内存
-				if totalSize+fileSize > availableMemory && len(batchFiles) > 0 {
-					// 处理当前批次文件
-					fmt.Println("处理当前批次文件", batchFiles)
-					err = processCSVFiles(batchFiles, delimiter, mysqeExec, tableName, &tableResult)
-					if err != nil {
-						tableResult.ErrorCount++
-						logMsg := fmt.Sprintf("处理文件批次时出错: %v", err)
-						tableResult.Logs = append(tableResult.Logs, logMsg)
-					}
-					// 重置批次
-					batchFiles = []string{}
-					fmt.Println("重置批次")
-					totalSize = 0
-				}
-
-				batchFiles = append(batchFiles, filePath)
-				totalSize += fileSize
-
-				// 如果是最后一个文件,处理剩余的批次
-				if idx == len(matchedFiles)-1 && len(batchFiles) > 0 {
-					fmt.Println(",处理剩余的批次 ", batchFiles)
-					err = processCSVFiles(batchFiles, delimiter, mysqeExec, tableName, &tableResult)
-					if err != nil {
-						tableResult.ErrorCount++
-						logMsg := fmt.Sprintf("处理文件批次时出错: %v", err)
-						tableResult.Logs = append(tableResult.Logs, logMsg)
-					}
-				}
-			}
+			// 处理单个表
+			result = processTable(dbName, dirPath, tableName, tableInfo, delimiter, pageSize, mysqeExec, &result, logger)
 
-			// 计算导出持续时间
-			tableResult.ExportDuration = time.Since(startTime)
-			tableResult.Success = tableResult.ErrorCount == 0
+			// 将结果发送到通道
+			resultsChan <- result
 
-			resultChan <- tableResult
-		}(tableName)
+			// 记录“完成处理表”日志
+			logMessage(&result, fmt.Sprintf("完成处理表:%s\n", tableName), logger)
+		}(tableName, tableInfo)
 	}
 
-	// 开启一个协程来收集结果并打印
+	// 启动一个协程,等待所有表处理完毕后关闭结果通道
 	go func() {
 		wg.Wait()
-		close(resultChan)
+		close(resultsChan)
 	}()
 
-	// 打印结果
-	for result := range resultChan {
-		if result.Success {
-			fmt.Printf("表 %s 导入成功,耗时 %v,共插入 %d 行数据。\n", result.TableName, result.ExportDuration, result.TotalRows)
-		} else {
-			fmt.Printf("表 %s 导入完成,存在错误,错误数量 %d,耗时 %v。\n", result.TableName, result.ErrorCount, result.ExportDuration)
-		}
-		// 打印详细日志
-		for _, logMsg := range result.Logs {
-			fmt.Println(logMsg)
-		}
+	// 收集所有处理结果到 resultsMap
+	resultsMap := make(map[string]TableResult)
+
+	// 从结果通道接收每个表的处理结果
+	for result := range resultsChan {
+		// 处理结果,例如输出日志或进行其他处理
+		logMessage(&result, fmt.Sprintf("表 %s 处理完成,成功:%v,错误数量:%d,总行数:%d\n",
+			result.TableName, result.Success, result.ErrorCount, result.TotalRows), logger)
+
+		// 将结果存储到 resultsMap
+		resultsMap[result.TableName] = result
+	}
+
+	// 调用 WriteImportReportToLogFile 将总览信息和详细日志写入日志文件
+	err = WriteImportReportToLogFile("import_report.log", resultsMap, tableInfos)
+	if err != nil {
+		logger.Printf("写入导入报告失败:%v\n", err)
+	} else {
+		logger.Println("导入报告已成功写入 import_report.log")
 	}
 }
 
-// processCSVFiles 处理一批 CSV 文件
-func processCSVFiles(
-	files []string,
+// 处理单个表的函数
+func processTable(
+	dbName string,
+	dirPath string,
+	tableName string,
+	tableInfo models.TableInfo,
 	delimiter rune,
+	pageSize int,
 	mysqeExec db_executor.DBExecutor,
-	tableName string,
-	tableResult *TableResult,
-) error {
-	for _, filePath := range files {
-		logMsg := fmt.Sprintf("开始读入文件: %s", filePath)
-		fmt.Println(logMsg)
-		tableResult.Logs = append(tableResult.Logs, logMsg)
+	result *TableResult,
+	logger *log.Logger,
+) TableResult {
+	logMessage(result, fmt.Sprintf("开始处理表 %s 的数据\n", tableName), logger)
 
-		// 读取整个 CSV 文件到内存中
-		headers, records, err := ReadEntireCSV(filePath, delimiter)
-		if err != nil {
-			tableResult.ErrorCount++
-			logMsg = fmt.Sprintf("无法读取 CSV 文件 %s: %v", filePath, err)
-			fmt.Println(logMsg)
-			tableResult.Logs = append(tableResult.Logs, logMsg)
-			continue
-		}
+	// 记录开始时间
+	startTime := time.Now()
+
+	// 构建表对应的目录路径
+	tableDir := filepath.Join(dirPath, tableName)
+	logMessage(result, fmt.Sprintf("表 %s 的目录路径为:%s\n", tableName, tableDir), logger)
 
-		// 更新总行数和平均行大小
-		tableResult.TotalRows += len(records)
-		if len(records) > 0 {
-			fileInfo, statErr := os.Stat(filePath)
-			if statErr == nil && fileInfo.Size() > 0 {
-				rowSize := fileInfo.Size() / int64(len(records))
-				// 计算平均行大小
-				if tableResult.AverageRowSize == 0 {
-					tableResult.AverageRowSize = rowSize
+	// 获取该表目录下的所有 CSV 文件
+	files, err := ioutil.ReadDir(tableDir)
+	if err != nil {
+		result.Success = false
+		result.Error = err
+		logMsg := fmt.Sprintf("读取目录失败:%v\n", err)
+		logMessage(result, logMsg, logger)
+		return *result
+	}
+
+	// 用于统计行大小的变量
+	var totalRowSize int64 = 0
+
+	// 遍历所有 CSV 文件,启动协程处理
+	var fileWg sync.WaitGroup
+
+	// 控制最大并发数
+	maxGoroutines := 5
+	semaphore := make(chan struct{}, maxGoroutines)
+
+	for _, file := range files {
+		if !file.IsDir() && strings.HasPrefix(file.Name(), tableName+"_") && strings.HasSuffix(file.Name(), ".csv") {
+			// 满足条件的文件
+			filePath := filepath.Join(tableDir, file.Name())
+			result.GeneratedFiles++
+			fileWg.Add(1)
+			result.GoroutineCount++
+
+			logMessage(result, fmt.Sprintf("发现文件:%s,启动协程进行处理\n", filePath), logger)
+
+			go func(filePath string) {
+				defer fileWg.Done()
+				semaphore <- struct{}{}        // 获取信号量
+				defer func() { <-semaphore }() // 释放信号量
+
+				logMessage(result, fmt.Sprintf("开始处理文件:%s\n", filePath), logger)
+
+				// 处理单个 CSV 文件
+				err := processCSVFile(filePath, tableName, delimiter, pageSize, mysqeExec, result, &totalRowSize, logger)
+				if err != nil {
+					result.ErrorCount++
+					logMsg := fmt.Sprintf("处理文件 %s 失败:%v\n", filePath, err)
+					logMessage(result, logMsg, logger)
 				} else {
-					tableResult.AverageRowSize = (tableResult.AverageRowSize + rowSize) / 2
+					logMessage(result, fmt.Sprintf("文件 %s 处理成功\n", filePath), logger)
 				}
-			}
+			}(filePath)
 		}
+	}
 
-		// 插入数据到数据库
-		err = mysqeExec.InsertRecordsToDB(tableName, headers, records)
-		if err != nil {
-			tableResult.ErrorCount++
-			logMsg = fmt.Sprintf("无法将记录插入数据库: %v", err)
-			fmt.Println(logMsg)
-			tableResult.Logs = append(tableResult.Logs, logMsg)
-			continue
-		}
+	// 等待所有文件处理完毕
+	fileWg.Wait()
 
-		logMsg = fmt.Sprintf("完成导入文件: %s,共插入 %d 行数据。", filePath, len(records))
-		fmt.Println(logMsg)
-		tableResult.Logs = append(tableResult.Logs, logMsg)
+	// 计算统计信息
+	result.ExportDuration = time.Since(startTime)
+	if result.TotalRows > 0 {
+		result.AverageRowSize = totalRowSize / int64(result.TotalRows)
 	}
-	return nil
+
+	logMessage(result, fmt.Sprintf("表 %s 的数据处理完成,耗时:%v\n", tableName, result.ExportDuration), logger)
+
+	return *result
 }
 
-// ReadEntireCSV 读取整个 CSV 文件到内存中
-func ReadEntireCSV(filePath string, delimiter rune) ([]string, [][]string, error) {
+// 处理单个 CSV 文件的函数
+func processCSVFile(
+	filePath string,
+	tableName string,
+	delimiter rune,
+	pageSize int,
+	mysqeExec db_executor.DBExecutor,
+	result *TableResult,
+	totalRowSize *int64,
+	logger *log.Logger,
+) error {
 	// 打开 CSV 文件
 	file, err := os.Open(filePath)
 	if err != nil {
-		return nil, nil, fmt.Errorf("无法打开文件: %v", err)
+		return fmt.Errorf("打开文件失败:%v", err)
 	}
 	defer file.Close()
 
-	// 创建 CSV Reader,设置分隔符
+	logMessage(result, fmt.Sprintf("打开文件成功:%s\n", filePath), logger)
+
+	// 获取文件大小,更新表的容量
+	fileInfo, err := file.Stat()
+	if err == nil {
+		result.TableSize += fileInfo.Size()
+		logMessage(result, fmt.Sprintf("文件大小:%d 字节,累计表大小:%d 字节\n", fileInfo.Size(), result.TableSize), logger)
+	}
+
+	// 创建 CSV Reader
 	reader := csv.NewReader(file)
 	reader.Comma = delimiter
 
-	// 读取所有数据
-	records, err := reader.ReadAll()
+	// 读取表头
+	headers, err := reader.Read()
 	if err != nil {
-		return nil, nil, fmt.Errorf("无法读取 CSV 文件: %v", err)
+		return fmt.Errorf("读取表头失败:%v", err)
 	}
+	logMessage(result, fmt.Sprintf("读取表头成功:%v\n", headers), logger)
+
+	// 初始化记录批次
+	var recordsBatch [][]string
+
+	for {
+		// 读取一行记录
+		record, err := reader.Read()
+		if err == io.EOF {
+			// 文件读取完毕,插入剩余的记录
+			if len(recordsBatch) > 0 {
+				logMessage(result, fmt.Sprintf("文件读取完毕,插入剩余的 %d 条记录到数据库\n", len(recordsBatch)), logger)
+				err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch)
+				if err != nil {
+					return fmt.Errorf("插入数据库失败:%v", err)
+				}
+				// 更新总行数
+				result.TotalRows += len(recordsBatch)
+				// 计算行大小
+				for _, rec := range recordsBatch {
+					*totalRowSize += int64(len(strings.Join(rec, "")))
+				}
+				logMessage(result, fmt.Sprintf("成功插入剩余记录,累计总行数:%d\n", result.TotalRows), logger)
+			}
+			break
+		}
+		if err != nil {
+			return fmt.Errorf("读取记录失败:%v", err)
+		}
 
-	if len(records) == 0 {
-		return nil, nil, fmt.Errorf("CSV 文件为空")
-	}
-
-	// 第一个记录是表头
-	headers := records[0]
-	dataRecords := records[1:] // 跳过表头
-
-	return headers, dataRecords, nil
-}
+		// 添加到批次
+		recordsBatch = append(recordsBatch, record)
 
-// GetCSVFiles 获取指定目录下的所有 CSV 文件路径
-func GetCSVFiles(dirPath string) ([]string, error) {
-	var files []string
-	// 遍历目录,查找所有 .csv 文件
-	entries, err := os.ReadDir(dirPath)
-	if err != nil {
-		return nil, fmt.Errorf("无法读取目录 %s: %v", dirPath, err)
-	}
-	for _, entry := range entries {
-		if !entry.IsDir() && filepath.Ext(entry.Name()) == ".csv" {
-			files = append(files, filepath.Join(dirPath, entry.Name()))
+		// 如果达到批次大小,插入数据库
+		if len(recordsBatch) >= pageSize {
+			logMessage(result, fmt.Sprintf("达到批次大小 %d,插入记录到数据库\n", pageSize), logger)
+			err = mysqeExec.InsertRecordsToDB(tableName, headers, recordsBatch)
+			if err != nil {
+				return fmt.Errorf("插入数据库失败:%v", err)
+			}
+			// 更新总行数
+			result.TotalRows += len(recordsBatch)
+			// 计算行大小
+			for _, rec := range recordsBatch {
+				*totalRowSize += int64(len(strings.Join(rec, "")))
+			}
+			logMessage(result, fmt.Sprintf("成功插入记录,累计总行数:%d\n", result.TotalRows), logger)
+			// 清空批次
+			recordsBatch = recordsBatch[:0]
 		}
 	}
-	return files, nil
-}
 
-// getAvailableMemory 获取可用的系统内存
-func getAvailableMemory() int64 {
-	var m runtime.MemStats
-	runtime.ReadMemStats(&m)
-	// 假设使用可用内存的 80%
-	availableMemory := int64(m.Sys - m.Alloc)
-	return availableMemory * 80 / 100
+	return nil
 }

+ 61 - 136
internal/services/writer_services.go

@@ -1,7 +1,6 @@
 package services
 
 import (
-	"bytes"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -108,124 +107,6 @@ func sortedTableNames(tableInfos map[string]models.TableInfo) []string {
 	return tableNames
 }
 
-// 日志消息记录函数
-func logMessage(result *TableResult, msg string, logger *log.Logger) {
-	fmt.Print(msg)
-	logger.Print(msg)
-	result.Logs = append(result.Logs, msg)
-}
-
-// 估算数据传输速率的函数
-func estimateDataTransferRate() (float64, error) {
-	// 创建临时文件
-	tmpFile, err := ioutil.TempFile("", "data_transfer_test")
-	if err != nil {
-		return 0, err
-	}
-	defer os.Remove(tmpFile.Name()) // 确保测试完成后删除临时文件
-	defer tmpFile.Close()
-
-	// 准备测试数据,大小为 50 MB
-	testDataSize := 50 * 1024 * 1024 // 50 MB
-	testData := make([]byte, testDataSize)
-
-	// 写入测试数据并测量时间
-	startTime := time.Now()
-	_, err = tmpFile.Write(testData)
-	if err != nil {
-		return 0, err
-	}
-	writeDuration := time.Since(startTime)
-
-	// 计算写入速率
-	writeRate := float64(testDataSize) / writeDuration.Seconds()
-
-	// 读取测试数据并测量时间
-	tmpFile.Seek(0, io.SeekStart)
-	readBuffer := make([]byte, testDataSize)
-	startTime = time.Now()
-	_, err = io.ReadFull(tmpFile, readBuffer)
-	if err != nil && err != io.EOF {
-		return 0, err
-	}
-	readDuration := time.Since(startTime)
-
-	// 计算读取速率
-	readRate := float64(testDataSize) / readDuration.Seconds()
-
-	// 取读写速率的平均值作为数据传输速率
-	dataTransferRate := (writeRate + readRate) / 2
-
-	return dataTransferRate, nil
-}
-
-// 将总览信息和按表分类的详细日志写入日志文件
-func writeLogFile(logFilePath string, resultsMap map[string]TableResult, tableInfos map[string]models.TableInfo) error {
-	var logBuffer bytes.Buffer
-
-	// 构建总览信息
-	logBuffer.WriteString("========== 导出结果总览 ==========\n")
-	var successTables []string
-	var failedTables []TableResult
-	for _, result := range resultsMap {
-		if result.Success {
-			successTables = append(successTables, result.TableName)
-		} else {
-			failedTables = append(failedTables, result)
-		}
-	}
-	logBuffer.WriteString(fmt.Sprintf("成功导出的表(%d 个):\n", len(successTables)))
-	for _, tableName := range successTables {
-		result := resultsMap[tableName]
-		avgRowSizeKB := float64(result.AverageRowSize) / 1024.0
-		tableSizeMB := float64(result.TableSize) / (1024.0 * 1024.0)
-		exportTime := result.ExportDuration.Seconds()
-		estimatedTime := result.EstimatedDuration.Seconds()
-		logBuffer.WriteString(fmt.Sprintf("- %s\n", tableName))
-		logBuffer.WriteString(fmt.Sprintf("  总行数:%d\n", result.TotalRows))
-		logBuffer.WriteString(fmt.Sprintf("  平均行大小:%.2f KB\n", avgRowSizeKB))
-		logBuffer.WriteString(fmt.Sprintf("  表的容量:%.2f MB\n", tableSizeMB))
-		logBuffer.WriteString(fmt.Sprintf("  估算导出耗时:%.2f 秒\n", estimatedTime))
-		logBuffer.WriteString(fmt.Sprintf("  实际导出耗时:%.2f 秒\n", exportTime))
-		logBuffer.WriteString(fmt.Sprintf("  使用协程数:%d\n", result.GoroutineCount))
-	}
-	logBuffer.WriteString("\n")
-	if len(failedTables) > 0 {
-		logBuffer.WriteString(fmt.Sprintf("导出失败的表(%d 个):\n", len(failedTables)))
-		for _, failedTable := range failedTables {
-			logBuffer.WriteString(fmt.Sprintf("- %s\n", failedTable.TableName))
-			logBuffer.WriteString(fmt.Sprintf("  错误次数:%d\n", failedTable.ErrorCount))
-			logBuffer.WriteString(fmt.Sprintf("  使用协程数:%d\n", failedTable.GoroutineCount))
-			logBuffer.WriteString(fmt.Sprintf("  错误信息:%v\n", failedTable.Error))
-		}
-	} else {
-		logBuffer.WriteString("所有表均导出成功。\n")
-	}
-	logBuffer.WriteString("\n")
-	logBuffer.WriteString("========== 详细日志信息 ==========\n")
-
-	// 按表名排序,输出每个表的详细日志信息
-	for _, tableName := range sortedTableNames(tableInfos) {
-		logBuffer.WriteString(fmt.Sprintf("------------ 表 %s ------------\n", tableName))
-		result, exists := resultsMap[tableName]
-		if exists {
-			for _, msg := range result.Logs {
-				logBuffer.WriteString(msg)
-			}
-			logBuffer.WriteString("\n")
-		} else {
-			logBuffer.WriteString(fmt.Sprintf("表 %s 的处理结果不存在。\n", tableName))
-		}
-	}
-
-	// 将新的日志内容写回日志文件
-	err := ioutil.WriteFile(logFilePath, logBuffer.Bytes(), 0644)
-	if err != nil {
-		return fmt.Errorf("无法写入日志文件: %v", err)
-	}
-
-	return nil
-}
 
 /**
  * exportTableConcurrently 并发地导出多个表的数据到 CSV 文件。
@@ -258,7 +139,7 @@ func exportTableConcurrently(
 		maxConcurrentFileExports  = 100
 	)
 
-	// 创建日志文件
+	// ! 创建日志文件
 	logFilePath := filepath.Join(baseFilePath, "export.log")
 	logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
 	if err != nil {
@@ -269,7 +150,7 @@ func exportTableConcurrently(
 	// 创建日志记录器,不添加任何前缀或标志
 	logger := log.New(logFile, "", 0)
 
-	// 估算数据传输速率
+	// ! 估算数据传输速率
 	dataTransferRate, err := estimateDataTransferRate()
 	if err != nil {
 		msg := fmt.Sprintf("%s 无法估算数据传输速率,默认使用 50 MB/s。错误信息:%v\n", currentTime(), err)
@@ -282,7 +163,7 @@ func exportTableConcurrently(
 		logger.Print(msg)
 	}
 
-	// 遍历每个表进行处理,使用协程
+	// ! 遍历每个表进行处理,使用协程
 	for _, tableName := range sortedTableNames(tableInfos) {
 		tableInfo := tableInfos[tableName]
 		wg.Add(1)
@@ -303,7 +184,7 @@ func exportTableConcurrently(
 				GoroutineCount: 1, // 包含当前协程
 			}
 
-			// 创建存储当前表的文件夹
+			// !创建存储当前表的文件夹
 			tableDir := filepath.Join(baseFilePath, tableName)
 			if err := os.MkdirAll(tableDir, 0755); err != nil {
 				msg := fmt.Sprintf("%s 创建表 %s 文件夹失败: %v\n", currentTime(), tableName, err)
@@ -319,7 +200,7 @@ func exportTableConcurrently(
 				return
 			}
 
-			// 估算每条记录的平均大小
+			// !估算每条记录的平均大小
 			averageRecordSize, err := estimateAverageRecordSize(*executor, tableName)
 			if err != nil {
 				msg := fmt.Sprintf("%s 无法估算表 %s 平均记录大小: %v\n", currentTime(), tableName, err)
@@ -339,14 +220,14 @@ func exportTableConcurrently(
 			msg := fmt.Sprintf("%s 表 %s 的平均记录大小: %d 字节\n", currentTime(), tableName, averageRecordSize)
 			logMessage(&result, msg, logger)
 
-			// 计算表的容量
+			// !计算表的容量
 			tableSize := averageRecordSize * int64(totalRecords)
 			result.TableSize = tableSize
 			tableSizeMB := float64(tableSize) / (1024.0 * 1024.0)
 			msg = fmt.Sprintf("%s 表 %s 的容量约为:%.2f MB\n", currentTime(), tableName, tableSizeMB)
 			logMessage(&result, msg, logger)
 
-			// 估算导出时间
+			// !估算导出时间
 			estimatedDurationSeconds := float64(tableSize) / dataTransferRate
 			estimatedDuration := time.Duration(estimatedDurationSeconds * float64(time.Second))
 			result.EstimatedDuration = estimatedDuration
@@ -354,7 +235,7 @@ func exportTableConcurrently(
 			msg = fmt.Sprintf("%s 表 %s 估算导出时间:%.2f 秒\n", currentTime(), tableName, estimatedDurationSeconds)
 			logMessage(&result, msg, logger)
 
-			// 计算每个 CSV 文件应包含的记录数
+			// !计算每个 CSV 文件应包含的记录数
 			maxFileSizeBytes := maxFileSizeMB * 1024 * 1024 // 将 MB 转换为字节
 			recordsPerFile := int(maxFileSizeBytes / averageRecordSize)
 			if recordsPerFile <= 0 {
@@ -363,12 +244,12 @@ func exportTableConcurrently(
 			msg = fmt.Sprintf("%s 表 %s 的每个文件应包含的记录数: %d\n", currentTime(), tableName, recordsPerFile)
 			logMessage(&result, msg, logger)
 
-			// 计算需要的 CSV 文件数量
+			// !计算需要的 CSV 文件数量
 			totalFiles := (totalRecords + recordsPerFile - 1) / recordsPerFile
 			msg = fmt.Sprintf("%s 表 %s 需要的 CSV 文件数量: %d\n", currentTime(), tableName, totalFiles)
 			logMessage(&result, msg, logger)
 
-			// 记录导出开始时间
+			//! 记录导出开始时间
 			startTime := time.Now()
 
 			var fileWg sync.WaitGroup
@@ -455,7 +336,7 @@ func exportTableConcurrently(
 	wg.Wait()
 
 	// 写入日志文件
-	err = writeLogFile(logFilePath, resultsMap, tableInfos)
+	err = WriteExportReportToLogFile(logFilePath, resultsMap, tableInfos)
 	if err != nil {
 		return fmt.Errorf("无法写入日志文件: %v", err)
 	}
@@ -533,11 +414,11 @@ func exportDataRangeToCSV(
 	limit int,
 	pageSize int,
 ) error {
-	// 首先确保 CSV 文件存在,创建新文件并写入表头
+	// !首先确保 CSV 文件存在,创建新文件并写入表头
 	var headers []string
 	var headersWritten bool = false
 
-	// 计算需要分页的次数
+	// !计算需要分页的次数
 	totalPages := (limit + pageSize - 1) / pageSize
 
 	for page := 0; page < totalPages; page++ {
@@ -547,22 +428,22 @@ func exportDataRangeToCSV(
 			pageLimit = (offset + limit) - pageOffset
 		}
 
-		// 从数据库读取数据
+		// !从数据库读取数据
 		currentHeaders, records, err := executor.ReadTableFromDBWithPagination(tableName, pageOffset, pageLimit)
 		if err != nil {
 			return fmt.Errorf("failed to read data from table %s: %v", tableName, err)
 		}
 
-		// 仅在首次获取到表头时进行赋值
+		// !仅在首次获取到表头时进行赋值
 		if !headersWritten && currentHeaders != nil {
 			headers = currentHeaders
 			headersWritten = true
 		}
 
-		// 判断是否需要写入表头
+		// !判断是否需要写入表头
 		appendToFile := page > 0 // 第一页时创建新文件,之后的页追加写入
 
-		// 写入数据到 CSV 文件
+		// !写入数据到 CSV 文件
 		err = csvwriter.WriteRecordsToCSV(csvFilePath, headers, records, delimiter, appendToFile)
 		if err != nil {
 			return fmt.Errorf("failed to write data to CSV: %v", err)
@@ -573,3 +454,47 @@ func exportDataRangeToCSV(
 
 	return nil
 }
+
+// 估算数据传输速率的函数
+func estimateDataTransferRate() (float64, error) {
+	// 创建临时文件
+	tmpFile, err := ioutil.TempFile("", "data_transfer_test")
+	if err != nil {
+		return 0, err
+	}
+	defer os.Remove(tmpFile.Name()) // 确保测试完成后删除临时文件
+	defer tmpFile.Close()
+
+	// 准备测试数据,大小为 50 MB
+	testDataSize := 50 * 1024 * 1024 // 50 MB
+	testData := make([]byte, testDataSize)
+
+	// 写入测试数据并测量时间
+	startTime := time.Now()
+	_, err = tmpFile.Write(testData)
+	if err != nil {
+		return 0, err
+	}
+	writeDuration := time.Since(startTime)
+
+	// 计算写入速率
+	writeRate := float64(testDataSize) / writeDuration.Seconds()
+
+	// 读取测试数据并测量时间
+	tmpFile.Seek(0, io.SeekStart)
+	readBuffer := make([]byte, testDataSize)
+	startTime = time.Now()
+	_, err = io.ReadFull(tmpFile, readBuffer)
+	if err != nil && err != io.EOF {
+		return 0, err
+	}
+	readDuration := time.Since(startTime)
+
+	// 计算读取速率
+	readRate := float64(testDataSize) / readDuration.Seconds()
+
+	// 取读写速率的平均值作为数据传输速率
+	dataTransferRate := (writeRate + readRate) / 2
+
+	return dataTransferRate, nil
+}