|
@@ -0,0 +1,575 @@
|
|
|
+package services
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "fmt"
|
|
|
+ "io"
|
|
|
+ "io/ioutil"
|
|
|
+ "log"
|
|
|
+ "os"
|
|
|
+ "path/filepath"
|
|
|
+ "sort"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+ "xg_fetl/internal/db_executor"
|
|
|
+ "xg_fetl/internal/models"
|
|
|
+ "xg_fetl/module/csv/csvwriter"
|
|
|
+)
|
|
|
+
|
|
|
+/*
|
|
|
+WriteMain 导出数据库表数据到 CSV 文件,基于分页和文件大小进行分片
|
|
|
+参数:
|
|
|
+* executor: 数据库执行器,用于读取数据库表中的数据
|
|
|
+* tableNames: 要导出的表名数组
|
|
|
+* baseFilePath: 导出文件的基本路径,用于生成不同分片的文件名
|
|
|
+* delimiter: CSV 的分隔符
|
|
|
+* maxFileSize: 单个 CSV 文件的最大大小
|
|
|
+* pageSize: 每次分页读取的行数
|
|
|
+*/
|
|
|
+func WriteMain(
|
|
|
+ executor db_executor.DBExecutor,
|
|
|
+ tableNames map[string]models.TableInfo,
|
|
|
+ baseFilePath string,
|
|
|
+ delimiter rune,
|
|
|
+ maxFileSize int64,
|
|
|
+ pageSize int) error {
|
|
|
+
|
|
|
+ {
|
|
|
+ // --------------
|
|
|
+ // fmt.Println("--------tableName: 开始导出表数据--------", tableName)
|
|
|
+ // // 开始分页读取并导出数据
|
|
|
+ // offset := 0
|
|
|
+ // for {
|
|
|
+ // // 从数据库中分页读取数据
|
|
|
+ // headers, records, err := executor.ReadTableFromDBWithPagination(tableName, offset, pageSize)
|
|
|
+ // if err != nil {
|
|
|
+ // log.Fatalf("Failed to read table with pagination: %v", err)
|
|
|
+ // return err
|
|
|
+ // }
|
|
|
+ // fmt.Println("循环")
|
|
|
+ // // 如果没有更多数据,则结束循环
|
|
|
+ // if records == nil {
|
|
|
+ // break
|
|
|
+ // }
|
|
|
+
|
|
|
+ // fmt.Println("baseFilePath: ", baseFilePath)
|
|
|
+ // csvFilePath := fmt.Sprintf("%s/%s", baseFilePath, tableName)
|
|
|
+ // // 将数据写入 CSV 文件中
|
|
|
+ // err = csvwriter.WriteRecordsToCSVWithFileSizeLimit(csvFilePath, headers, records, delimiter, maxFileSize, &fileIndex, ¤tFileSize, ¤tFilePath)
|
|
|
+ // if err != nil {
|
|
|
+ // log.Fatalf("Failed to write records to CSV: %v", err)
|
|
|
+ // return err
|
|
|
+ // }
|
|
|
+
|
|
|
+ // // 增加偏移量,以获取下一页数据
|
|
|
+ // offset += pageSize
|
|
|
+ // }
|
|
|
+ // fmt.Println("--------tableName 完成--------", tableName)
|
|
|
+ // }
|
|
|
+ err := exportTableConcurrently(tableNames, baseFilePath, pageSize, delimiter, &executor, maxFileSize)
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println("导出表数据失败:", err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ fmt.Println("Table exported successfully with size limit and pagination")
|
|
|
+ // return nil
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// 定义用于接收每个表处理结果的结构体
|
|
|
+type TableResult struct {
|
|
|
+ TableName string
|
|
|
+ Success bool
|
|
|
+ Error error
|
|
|
+ GeneratedFiles int
|
|
|
+ ErrorCount int // 记录该表发生的错误数量
|
|
|
+ GoroutineCount int // 记录该表使用的协程数量
|
|
|
+ TotalRows int // 表的总行数
|
|
|
+ AverageRowSize int64 // 平均行大小
|
|
|
+ ExportDuration time.Duration // 导出所花费的时间
|
|
|
+ EstimatedDuration time.Duration // 估算的导出时间
|
|
|
+ TableSize int64 // 表的容量(字节)
|
|
|
+ Logs []string // 保存该表的日志信息
|
|
|
+}
|
|
|
+
|
|
|
+// 获取当前时间字符串
|
|
|
+func currentTime() string {
|
|
|
+ return time.Now().Format("2006-01-02 15:04:05")
|
|
|
+}
|
|
|
+
|
|
|
+// 对表名进行排序,返回排序后的表名列表
|
|
|
+func sortedTableNames(tableInfos map[string]models.TableInfo) []string {
|
|
|
+ tableNames := make([]string, 0, len(tableInfos))
|
|
|
+ for name := range tableInfos {
|
|
|
+ tableNames = append(tableNames, name)
|
|
|
+ }
|
|
|
+ sort.Strings(tableNames)
|
|
|
+ return tableNames
|
|
|
+}
|
|
|
+
|
|
|
+// 日志消息记录函数
|
|
|
+func logMessage(result *TableResult, msg string, logger *log.Logger) {
|
|
|
+ fmt.Print(msg)
|
|
|
+ logger.Print(msg)
|
|
|
+ result.Logs = append(result.Logs, msg)
|
|
|
+}
|
|
|
+
|
|
|
+// 估算数据传输速率的函数
|
|
|
+func estimateDataTransferRate() (float64, error) {
|
|
|
+ // 创建临时文件
|
|
|
+ tmpFile, err := ioutil.TempFile("", "data_transfer_test")
|
|
|
+ if err != nil {
|
|
|
+ return 0, err
|
|
|
+ }
|
|
|
+ defer os.Remove(tmpFile.Name()) // 确保测试完成后删除临时文件
|
|
|
+ defer tmpFile.Close()
|
|
|
+
|
|
|
+ // 准备测试数据,大小为 50 MB
|
|
|
+ testDataSize := 50 * 1024 * 1024 // 50 MB
|
|
|
+ testData := make([]byte, testDataSize)
|
|
|
+
|
|
|
+ // 写入测试数据并测量时间
|
|
|
+ startTime := time.Now()
|
|
|
+ _, err = tmpFile.Write(testData)
|
|
|
+ if err != nil {
|
|
|
+ return 0, err
|
|
|
+ }
|
|
|
+ writeDuration := time.Since(startTime)
|
|
|
+
|
|
|
+ // 计算写入速率
|
|
|
+ writeRate := float64(testDataSize) / writeDuration.Seconds()
|
|
|
+
|
|
|
+ // 读取测试数据并测量时间
|
|
|
+ tmpFile.Seek(0, io.SeekStart)
|
|
|
+ readBuffer := make([]byte, testDataSize)
|
|
|
+ startTime = time.Now()
|
|
|
+ _, err = io.ReadFull(tmpFile, readBuffer)
|
|
|
+ if err != nil && err != io.EOF {
|
|
|
+ return 0, err
|
|
|
+ }
|
|
|
+ readDuration := time.Since(startTime)
|
|
|
+
|
|
|
+ // 计算读取速率
|
|
|
+ readRate := float64(testDataSize) / readDuration.Seconds()
|
|
|
+
|
|
|
+ // 取读写速率的平均值作为数据传输速率
|
|
|
+ dataTransferRate := (writeRate + readRate) / 2
|
|
|
+
|
|
|
+ return dataTransferRate, nil
|
|
|
+}
|
|
|
+
|
|
|
+// 将总览信息和按表分类的详细日志写入日志文件
|
|
|
+func writeLogFile(logFilePath string, resultsMap map[string]TableResult, tableInfos map[string]models.TableInfo) error {
|
|
|
+ var logBuffer bytes.Buffer
|
|
|
+
|
|
|
+ // 构建总览信息
|
|
|
+ logBuffer.WriteString("========== 导出结果总览 ==========\n")
|
|
|
+ var successTables []string
|
|
|
+ var failedTables []TableResult
|
|
|
+ for _, result := range resultsMap {
|
|
|
+ if result.Success {
|
|
|
+ successTables = append(successTables, result.TableName)
|
|
|
+ } else {
|
|
|
+ failedTables = append(failedTables, result)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ logBuffer.WriteString(fmt.Sprintf("成功导出的表(%d 个):\n", len(successTables)))
|
|
|
+ for _, tableName := range successTables {
|
|
|
+ result := resultsMap[tableName]
|
|
|
+ avgRowSizeKB := float64(result.AverageRowSize) / 1024.0
|
|
|
+ tableSizeMB := float64(result.TableSize) / (1024.0 * 1024.0)
|
|
|
+ exportTime := result.ExportDuration.Seconds()
|
|
|
+ estimatedTime := result.EstimatedDuration.Seconds()
|
|
|
+ logBuffer.WriteString(fmt.Sprintf("- %s\n", tableName))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 总行数:%d\n", result.TotalRows))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 平均行大小:%.2f KB\n", avgRowSizeKB))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 表的容量:%.2f MB\n", tableSizeMB))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 估算导出耗时:%.2f 秒\n", estimatedTime))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 实际导出耗时:%.2f 秒\n", exportTime))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 使用协程数:%d\n", result.GoroutineCount))
|
|
|
+ }
|
|
|
+ logBuffer.WriteString("\n")
|
|
|
+ if len(failedTables) > 0 {
|
|
|
+ logBuffer.WriteString(fmt.Sprintf("导出失败的表(%d 个):\n", len(failedTables)))
|
|
|
+ for _, failedTable := range failedTables {
|
|
|
+ logBuffer.WriteString(fmt.Sprintf("- %s\n", failedTable.TableName))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 错误次数:%d\n", failedTable.ErrorCount))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 使用协程数:%d\n", failedTable.GoroutineCount))
|
|
|
+ logBuffer.WriteString(fmt.Sprintf(" 错误信息:%v\n", failedTable.Error))
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ logBuffer.WriteString("所有表均导出成功。\n")
|
|
|
+ }
|
|
|
+ logBuffer.WriteString("\n")
|
|
|
+ logBuffer.WriteString("========== 详细日志信息 ==========\n")
|
|
|
+
|
|
|
+ // 按表名排序,输出每个表的详细日志信息
|
|
|
+ for _, tableName := range sortedTableNames(tableInfos) {
|
|
|
+ logBuffer.WriteString(fmt.Sprintf("------------ 表 %s ------------\n", tableName))
|
|
|
+ result, exists := resultsMap[tableName]
|
|
|
+ if exists {
|
|
|
+ for _, msg := range result.Logs {
|
|
|
+ logBuffer.WriteString(msg)
|
|
|
+ }
|
|
|
+ logBuffer.WriteString("\n")
|
|
|
+ } else {
|
|
|
+ logBuffer.WriteString(fmt.Sprintf("表 %s 的处理结果不存在。\n", tableName))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 将新的日志内容写回日志文件
|
|
|
+ err := ioutil.WriteFile(logFilePath, logBuffer.Bytes(), 0644)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("无法写入日志文件: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * exportTableConcurrently 并发地导出多个表的数据到 CSV 文件。
|
|
|
+ *
|
|
|
+ * 参数:
|
|
|
+ * - tableInfos: 包含表信息的映射,键为表名,值为 TableInfo。
|
|
|
+ * - baseFilePath: 基础文件路径,用于存储导出的 CSV 文件。
|
|
|
+ * - pageSize: 分页大小,决定每次从数据库读取的记录数量。
|
|
|
+ * - delimiter: CSV 文件的分隔符。
|
|
|
+ * - executor: 数据库执行器,用于执行数据库查询。
|
|
|
+ * - maxFileSizeMB: 单个 CSV 文件的最大大小(以 MB 为单位)。
|
|
|
+ *
|
|
|
+ * 返回值:
|
|
|
+ * - error: 如果发生错误,返回错误信息。
|
|
|
+ */
|
|
|
+func exportTableConcurrently(
|
|
|
+ tableInfos map[string]models.TableInfo,
|
|
|
+ baseFilePath string,
|
|
|
+ pageSize int,
|
|
|
+ delimiter rune,
|
|
|
+ executor *db_executor.DBExecutor,
|
|
|
+ maxFileSizeMB int64,
|
|
|
+) error {
|
|
|
+ var (
|
|
|
+ wg sync.WaitGroup
|
|
|
+ resultsMap = make(map[string]TableResult)
|
|
|
+ resultsMutex sync.Mutex // 用于保护 resultsMap 的并发访问
|
|
|
+ maxConcurrentTableExports = 3
|
|
|
+ tableSemaphore = make(chan struct{}, maxConcurrentTableExports)
|
|
|
+ maxConcurrentFileExports = 100
|
|
|
+ )
|
|
|
+
|
|
|
+ // 创建日志文件
|
|
|
+ logFilePath := filepath.Join(baseFilePath, "export.log")
|
|
|
+ logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("无法打开日志文件: %v", err)
|
|
|
+ }
|
|
|
+ defer logFile.Close()
|
|
|
+
|
|
|
+ // 创建日志记录器,不添加任何前缀或标志
|
|
|
+ logger := log.New(logFile, "", 0)
|
|
|
+
|
|
|
+ // 估算数据传输速率
|
|
|
+ dataTransferRate, err := estimateDataTransferRate()
|
|
|
+ if err != nil {
|
|
|
+ msg := fmt.Sprintf("%s 无法估算数据传输速率,默认使用 50 MB/s。错误信息:%v\n", currentTime(), err)
|
|
|
+ fmt.Print(msg)
|
|
|
+ logger.Print(msg)
|
|
|
+ dataTransferRate = 50 * 1024 * 1024 // 默认 50 MB/s
|
|
|
+ } else {
|
|
|
+ msg := fmt.Sprintf("%s 估算的数据传输速率为:%.2f MB/s\n", currentTime(), dataTransferRate/(1024*1024))
|
|
|
+ fmt.Print(msg)
|
|
|
+ logger.Print(msg)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 遍历每个表进行处理,使用协程
|
|
|
+ for _, tableName := range sortedTableNames(tableInfos) {
|
|
|
+ tableInfo := tableInfos[tableName]
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tableInfo models.TableInfo) {
|
|
|
+ tableSemaphore <- struct{}{} // 获取表级信号量
|
|
|
+ defer func() {
|
|
|
+ <-tableSemaphore // 释放表级信号量
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+
|
|
|
+ tableName := tableInfo.Name
|
|
|
+ totalRecords := tableInfo.Count
|
|
|
+
|
|
|
+ // 创建一个 TableResult 来保存该表的结果和日志
|
|
|
+ result := TableResult{
|
|
|
+ TableName: tableName,
|
|
|
+ TotalRows: totalRecords,
|
|
|
+ GoroutineCount: 1, // 包含当前协程
|
|
|
+ }
|
|
|
+
|
|
|
+ // 创建存储当前表的文件夹
|
|
|
+ tableDir := filepath.Join(baseFilePath, tableName)
|
|
|
+ if err := os.MkdirAll(tableDir, 0755); err != nil {
|
|
|
+ msg := fmt.Sprintf("%s 创建表 %s 文件夹失败: %v\n", currentTime(), tableName, err)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+
|
|
|
+ result.Success = false
|
|
|
+ result.Error = err
|
|
|
+ result.ErrorCount = 1
|
|
|
+
|
|
|
+ resultsMutex.Lock()
|
|
|
+ resultsMap[tableName] = result
|
|
|
+ resultsMutex.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // 估算每条记录的平均大小
|
|
|
+ averageRecordSize, err := estimateAverageRecordSize(*executor, tableName)
|
|
|
+ if err != nil {
|
|
|
+ msg := fmt.Sprintf("%s 无法估算表 %s 平均记录大小: %v\n", currentTime(), tableName, err)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+
|
|
|
+ result.Success = false
|
|
|
+ result.Error = err
|
|
|
+ result.ErrorCount = 1
|
|
|
+
|
|
|
+ resultsMutex.Lock()
|
|
|
+ resultsMap[tableName] = result
|
|
|
+ resultsMutex.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ result.AverageRowSize = averageRecordSize
|
|
|
+
|
|
|
+ msg := fmt.Sprintf("%s 表 %s 的平均记录大小: %d 字节\n", currentTime(), tableName, averageRecordSize)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+
|
|
|
+ // 计算表的容量
|
|
|
+ tableSize := averageRecordSize * int64(totalRecords)
|
|
|
+ result.TableSize = tableSize
|
|
|
+ tableSizeMB := float64(tableSize) / (1024.0 * 1024.0)
|
|
|
+ msg = fmt.Sprintf("%s 表 %s 的容量约为:%.2f MB\n", currentTime(), tableName, tableSizeMB)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+
|
|
|
+ // 估算导出时间
|
|
|
+ estimatedDurationSeconds := float64(tableSize) / dataTransferRate
|
|
|
+ estimatedDuration := time.Duration(estimatedDurationSeconds * float64(time.Second))
|
|
|
+ result.EstimatedDuration = estimatedDuration
|
|
|
+
|
|
|
+ msg = fmt.Sprintf("%s 表 %s 估算导出时间:%.2f 秒\n", currentTime(), tableName, estimatedDurationSeconds)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+
|
|
|
+ // 计算每个 CSV 文件应包含的记录数
|
|
|
+ maxFileSizeBytes := maxFileSizeMB * 1024 * 1024 // 将 MB 转换为字节
|
|
|
+ recordsPerFile := int(maxFileSizeBytes / averageRecordSize)
|
|
|
+ if recordsPerFile <= 0 {
|
|
|
+ recordsPerFile = 1000 // 设置一个最小值,避免 recordsPerFile 为 0
|
|
|
+ }
|
|
|
+ msg = fmt.Sprintf("%s 表 %s 的每个文件应包含的记录数: %d\n", currentTime(), tableName, recordsPerFile)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+
|
|
|
+ // 计算需要的 CSV 文件数量
|
|
|
+ totalFiles := (totalRecords + recordsPerFile - 1) / recordsPerFile
|
|
|
+ msg = fmt.Sprintf("%s 表 %s 需要的 CSV 文件数量: %d\n", currentTime(), tableName, totalFiles)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+
|
|
|
+ // 记录导出开始时间
|
|
|
+ startTime := time.Now()
|
|
|
+
|
|
|
+ var fileWg sync.WaitGroup
|
|
|
+ fileSemaphore := make(chan struct{}, maxConcurrentFileExports)
|
|
|
+ maxRetries := 3
|
|
|
+
|
|
|
+ // 错误计数器
|
|
|
+ var errorCount int32
|
|
|
+
|
|
|
+ // 为每个 CSV 文件启动一个协程
|
|
|
+ for fileIndex := 0; fileIndex < totalFiles; fileIndex++ {
|
|
|
+ fileWg.Add(1)
|
|
|
+ result.GoroutineCount++ // 增加协程计数器
|
|
|
+
|
|
|
+ go func(fileIndex int) {
|
|
|
+ fileSemaphore <- struct{}{} // 获取文件级信号量
|
|
|
+ defer func() {
|
|
|
+ <-fileSemaphore // 释放文件级信号量
|
|
|
+ fileWg.Done()
|
|
|
+ }()
|
|
|
+
|
|
|
+ offset := fileIndex * recordsPerFile
|
|
|
+ limit := recordsPerFile
|
|
|
+ if offset+limit > totalRecords {
|
|
|
+ limit = totalRecords - offset
|
|
|
+ }
|
|
|
+
|
|
|
+ csvFilePath := filepath.Join(tableDir, fmt.Sprintf("%s_%d.csv", tableName, fileIndex+1))
|
|
|
+
|
|
|
+ attempt := 0
|
|
|
+ for {
|
|
|
+ attempt++
|
|
|
+ err := exportDataRangeToCSV(*executor, tableName, csvFilePath, delimiter, offset, limit, pageSize)
|
|
|
+ if err != nil {
|
|
|
+ if attempt < maxRetries {
|
|
|
+ msg := fmt.Sprintf("%s 表 %s 导出到文件 %s 时发生错误 (第 %d 次尝试): %v,重试...\n", currentTime(), tableName, csvFilePath, attempt, err)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+ time.Sleep(2 * time.Second)
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ msg := fmt.Sprintf("%s 表 %s 导出到文件 %s 时发生错误 (第 %d 次尝试): %v,已达最大重试次数。\n", currentTime(), tableName, csvFilePath, attempt, err)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+ // 增加错误计数器
|
|
|
+ errorCount++
|
|
|
+ break
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ msg := fmt.Sprintf("%s 表 %s 的文件 %s 导出完成。\n", currentTime(), tableName, filepath.Base(csvFilePath))
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }(fileIndex)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 等待当前表的所有文件导出完成
|
|
|
+ fileWg.Wait()
|
|
|
+
|
|
|
+ // 计算实际导出耗时
|
|
|
+ exportDuration := time.Since(startTime)
|
|
|
+ result.ExportDuration = exportDuration
|
|
|
+
|
|
|
+ msg = fmt.Sprintf("%s 表 %s 导出完成,共生成 %d 个文件,使用了 %d 个协程。\n", currentTime(), tableName, totalFiles, result.GoroutineCount)
|
|
|
+ logMessage(&result, msg, logger)
|
|
|
+
|
|
|
+ success := errorCount == 0
|
|
|
+ var finalError error
|
|
|
+ if !success {
|
|
|
+ finalError = fmt.Errorf("表 %s 导出过程中发生 %d 个错误", tableName, errorCount)
|
|
|
+ }
|
|
|
+
|
|
|
+ result.Success = success
|
|
|
+ result.Error = finalError
|
|
|
+ result.ErrorCount = int(errorCount)
|
|
|
+ result.GeneratedFiles = totalFiles
|
|
|
+
|
|
|
+ resultsMutex.Lock()
|
|
|
+ resultsMap[tableName] = result
|
|
|
+ resultsMutex.Unlock()
|
|
|
+ }(tableInfo)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 等待所有表处理完成
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+ // 写入日志文件
|
|
|
+ err = writeLogFile(logFilePath, resultsMap, tableInfos)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("无法写入日志文件: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * estimateAverageRecordSize 估算指定表中每条记录的平均大小(以字节为单位)。
|
|
|
+ *
|
|
|
+ * 参数:
|
|
|
+ * - executor: 数据库执行器,用于执行数据库查询。
|
|
|
+ * - tableName: 需要估算的表名。
|
|
|
+ *
|
|
|
+ * 返回值:
|
|
|
+ * - int64: 平均每条记录的大小(字节)。
|
|
|
+ * - error: 如果发生错误,返回错误信息。
|
|
|
+ */
|
|
|
+func estimateAverageRecordSize(executor db_executor.DBExecutor, tableName string) (int64, error) {
|
|
|
+ // 随机抽取一定数量的记录,计算平均大小
|
|
|
+ sampleSize := 100 // 抽样的记录数
|
|
|
+ offset := 0 // 从第 0 条记录开始抽样
|
|
|
+
|
|
|
+ // 使用 ReadTableFromDBWithPagination 函数获取样本数据
|
|
|
+ _, records, err := executor.ReadTableFromDBWithPagination(tableName, offset, sampleSize)
|
|
|
+ if err != nil {
|
|
|
+ return 0, fmt.Errorf("failed to query table: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 检查是否获取到了数据
|
|
|
+ if records == nil || len(records) == 0 {
|
|
|
+ return 0, fmt.Errorf("没有获取到抽样记录,表数量可能为 0")
|
|
|
+ }
|
|
|
+
|
|
|
+ var totalSize int64 = 0 // 总记录大小
|
|
|
+ var count int64 = 0 // 记录数
|
|
|
+
|
|
|
+ // 遍历每一条记录,计算其大小
|
|
|
+ for _, record := range records {
|
|
|
+ var recordSize int64 = 0
|
|
|
+ for _, val := range record {
|
|
|
+ if val != "" {
|
|
|
+ recordSize += int64(len(val)) // 累加字段值的长度
|
|
|
+ }
|
|
|
+ }
|
|
|
+ totalSize += recordSize // 累加每条记录的大小
|
|
|
+ count++
|
|
|
+ }
|
|
|
+
|
|
|
+ if count == 0 {
|
|
|
+ return 0, fmt.Errorf("没有获取到抽样记录,表数量可能为 0")
|
|
|
+ }
|
|
|
+
|
|
|
+ averageSize := totalSize / count // 计算平均记录大小
|
|
|
+ return averageSize, nil
|
|
|
+}
|
|
|
+
|
|
|
+// exportDataRangeToCSV 导出指定范围的数据到 CSV 文件
|
|
|
+// 参数:
|
|
|
+// - executor: 数据库执行器
|
|
|
+// - tableName: 表名
|
|
|
+// - csvFilePath: CSV 文件路径
|
|
|
+// - delimiter: CSV 分隔符
|
|
|
+// - offset: 数据起始偏移量
|
|
|
+// - limit: 数据读取限制数量
|
|
|
+// - pageSize: 分页大小
|
|
|
+// 返回值:
|
|
|
+// - error: 如果发生错误,返回错误信息
|
|
|
+func exportDataRangeToCSV(
|
|
|
+ executor db_executor.DBExecutor,
|
|
|
+ tableName string,
|
|
|
+ csvFilePath string,
|
|
|
+ delimiter rune,
|
|
|
+ offset int,
|
|
|
+ limit int,
|
|
|
+ pageSize int,
|
|
|
+) error {
|
|
|
+ // 首先确保 CSV 文件存在,创建新文件并写入表头
|
|
|
+ var headers []string
|
|
|
+ var headersWritten bool = false
|
|
|
+
|
|
|
+ // 计算需要分页的次数
|
|
|
+ totalPages := (limit + pageSize - 1) / pageSize
|
|
|
+
|
|
|
+ for page := 0; page < totalPages; page++ {
|
|
|
+ pageOffset := offset + page*pageSize
|
|
|
+ pageLimit := pageSize
|
|
|
+ if pageOffset+pageLimit > offset+limit {
|
|
|
+ pageLimit = (offset + limit) - pageOffset
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从数据库读取数据
|
|
|
+ currentHeaders, records, err := executor.ReadTableFromDBWithPagination(tableName, pageOffset, pageLimit)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to read data from table %s: %v", tableName, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 仅在首次获取到表头时进行赋值
|
|
|
+ if !headersWritten && currentHeaders != nil {
|
|
|
+ headers = currentHeaders
|
|
|
+ headersWritten = true
|
|
|
+ }
|
|
|
+
|
|
|
+ // 判断是否需要写入表头
|
|
|
+ appendToFile := page > 0 // 第一页时创建新文件,之后的页追加写入
|
|
|
+
|
|
|
+ // 写入数据到 CSV 文件
|
|
|
+ err = csvwriter.WriteRecordsToCSV(csvFilePath, headers, records, delimiter, appendToFile)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to write data to CSV: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ fmt.Printf("表 %s 文件 %s 已写入 %d/%d 页。\n", tableName, filepath.Base(csvFilePath), page+1, totalPages)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|