package capture import ( "database/sql" "fmt" "log" "time" "github.com/google/uuid" ) type OALicRecord struct { OA_ID sql.NullInt32 `json:"OA_ID"` OA_REQUESTID sql.NullInt32 `json:"oa_request_id"` OA_REQUESTNAME sql.NullString `json:"oa_request_name"` OA_REQUESTNAMENEW sql.NullString `json:"oa_request_name_new"` OA_REQUESTNAMEHTMLNEW sql.NullString `json:"oa_request_name_html_new"` OA_GLXMID sql.NullString `json:"oa_glxmid"` OA_GLXMNAME sql.NullString `json:"oa_glxmname"` OA_SQSJ sql.NullString `json:"oa_sqsj"` OA_SALESPERSONNAME sql.NullString `json:"oa_sales_person_name"` OA_XSJSYX sql.NullString `json:"oa_sales_email"` OA_OPERATIONSPERSONNAME sql.NullString `json:"oa_operations_person_name"` OA_JFJSYX sql.NullString `json:"oa_operations_email"` OA_SYDW sql.NullString `json:"oa_using_unit"` OA_XMXXMS sql.NullString `json:"oa_project_detail"` OA_JDS sql.NullInt32 `json:"oa_total_nodes"` OA_NODECOUNT sql.NullInt32 `json:"oa_node_count"` OA_PRODUCTCODE sql.NullString `json:"oa_product_code"` OA_PRODUCTNAME sql.NullString `json:"oa_product_name"` OA_PRODUCTVERSION sql.NullString `json:"oa_product_version"` OA_CPU sql.NullString `json:"oa_cpu"` OA_OPERATINGSYSTEM sql.NullString `json:"oa_operating_system"` OA_MAINMAC sql.NullString `json:"oa_main_mac"` OA_SECONDMAC sql.NullString `json:"oa_secondary_mac"` OA_CREATIONDATE sql.NullString `json:"oa_creation_date"` OA_CREATIONTIME sql.NullString `json:"oa_creation_time"` OA_LASTOPERATEDATE sql.NullString `json:"oa_last_operation_date"` OA_LASTOPERATETIME sql.NullString `json:"oa_last_operation_time"` CaptureTime sql.NullTime `json:"capture_time"` } func StartCapture(sourceDB, targetDB *sql.DB) { InitLogs("./logs/capture", "info") tableName := "target_OA_license" // 读取上次同步时间 lastSyncTime, err := getLastSyncTime(targetDB, tableName) if err != nil { Logger.Fatal("获取上次同步时间错误 Failed to retrieve last sync time from target DB:", err) } // 启动数据同步,每隔15分钟运行一次 for { Logger.Println("Start capturing data...") checkTemp, err := checkSourceOaLicTime(sourceDB, lastSyncTime) if err != nil { Logger.Error("Failed to checkSourceOaLicTime:", err) continue } if !checkTemp { time.Sleep(15 * time.Second) continue } Logger.Println("有新数据") lastSyncTime1, count, err := captureData(sourceDB, targetDB, lastSyncTime) lastSyncTime = lastSyncTime1 if err != nil { Logger.Error("抓取源数据错误 Failed to capture data:", err) } if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil { Logger.Error("插入同步时间表错误 Failed to insert sync metadata into target DB:", err) } Logger.Println("插入时间: ", lastSyncTime) if count > 0 { //lic关联用户表 err := insertLicToUser(count, targetDB) if err != nil { Logger.Error("关联用户表错误 Failed to insertLicToUser:", err) } } time.Sleep(15 * time.Second) } } func insertLicToUser(count int, targetDB *sql.DB) error { //查询抓取表中最新的count条数据 query := "SELECT UNIQUE_ID,OA_SALESPERSONNAME, OA_OPERATIONSPERSONNAME FROM target_OA_license ORDER BY capture_time DESC LIMIT ?" rows, err := targetDB.Query(query, count) if err != nil { Logger.Println("查询源库最新数据错误 Failed to execute query on source DB:", err) return err } defer rows.Close() Logger.Println("查询到最新的count条数据 ", count) for rows.Next() { var licUniqueID, salesPersonName, operationsPersonName sql.NullString err := rows.Scan(&licUniqueID, &salesPersonName, &operationsPersonName) if err != nil { Logger.Error("插入到目标库错误 insertLicToUser find newdata Failed to scan row:", err) continue } if salesPersonName.Valid || operationsPersonName.Valid != false { //将获取到的字段,和用户表关联对比 var userUNIQUEID, userACCOUNT sql.NullString queryLIC_USER := `SELECT UNIQUEID, ACCOUNT FROM LIC_USER where (USERNAME = ? OR USERNAME = ? ) and (role = 'ADMIN' OR role = 'supportRole' OR role = 'salesRole');` err = targetDB.QueryRow(queryLIC_USER, salesPersonName, operationsPersonName).Scan(&userUNIQUEID, &userACCOUNT) if err == sql.ErrNoRows { Logger.Warn("No rows were returned") continue } if err != nil { Logger.Error("find user name Failed to scan row :", err) continue } //fmt.Println("关联到用户 ", userUNIQUEID, userACCOUNT) insertQuery := "INSERT INTO LICENSERECORDTOUSER(LICENSE_UNIQUEID,USER_UNIQUEID,USER_ACCOUNT,OPERATOR_UNIQUEID,UP_TIME) VALUES(?,?,?,'SYSTEM_AUTO',?)" _, err = targetDB.Exec(insertQuery, licUniqueID, userUNIQUEID, userACCOUNT, time.Now(), ) if err != nil { Logger.Error("lic关联到目标库用户错误 Failed to sql2 insert record into target DB:", err) } } } return nil } // CaptureOnce 主动触发一次数据抓取 func CaptureOnce(sourceDB, targetDB *sql.DB) error { tableName := "target_OA_license" // 读取上次同步时间 lastSyncTime, err := getLastSyncTime(targetDB, tableName) if err != nil { Logger.Error("Failed to retrieve last sync time from target DB:", err) return err } // 执行一次数据抓取 lastSyncTime, count, err := captureData(sourceDB, targetDB, lastSyncTime) if err != nil { Logger.Error("Failed to capture data:", err) return err } if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil { Logger.Error("Failed to insert sync metadata into target DB:", err) return err } Logger.Println("Captured", count, "records.") if count > 0 { //lic关联用户表 err := insertLicToUser(count, targetDB) if err != nil { Logger.Error("insertLicToUser Failed to insert record into target DB:", err) return err } } return nil } // getLastSyncTime 从数据库中获取指定表的上次同步时间 func getLastSyncTime(db *sql.DB, tableName string) (time.Time, error) { var lastSyncTime time.Time query := ` SELECT last_sync_time FROM sync_metadata WHERE table_name = ? ORDER BY last_sync_time DESC LIMIT 1 ` err := db.QueryRow(query, tableName).Scan(&lastSyncTime) if err != nil { return time.Time{}, err } return lastSyncTime, nil } // insertSyncMetadata 插入新的同步元数据 func insertSyncMetadata(db *sql.DB, tableName string, lastSyncTime time.Time) error { query := ` INSERT INTO sync_metadata (table_name, last_sync_time) VALUES (?, ?) ` _, err := db.Exec(query, tableName, lastSyncTime) return err } func checkSourceOaLicTime(sourceDB *sql.DB, lastSyncTime time.Time) (bool, error) { var checkCount int checkTime := `SELECT count(1) FROM XUGU.WORKFLOW_REQUESTBASE WR where TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS');` error := sourceDB.QueryRow(checkTime, lastSyncTime).Scan(&checkCount) if error != nil { Logger.Println("Failed to check count:", error) return false, error } if checkCount == 0 { Logger.Println("没有新数据需要同步") return false, nil } return true, nil } // captureData 执行数据抓取和同步 func captureData(sourceDB, targetDB *sql.DB, lastSyncTime time.Time) (time.Time, int, error) { query := ` SELECT fmd.id , fm.REQUESTID, wr.REQUESTNAME, wr.REQUESTNAMENEW, wr.REQUESTNAMEHTMLNEW, fm.glxm, PP.NAME, fm.SQSJ, hrm1.LASTNAME, hrm2.LASTNAME, fm.XSJSYX, fm.JFJSYX, fm.SYDW, fm.XMXXMS, fm.JDS, fmd.JDS, ws1.SELECTVALUE, ws1.SELECTNAME, ws2.SELECTNAME, fmd.CLQ, fmd.CZXT, fmd.IP, fmd.MAC, wr.CREATEDATE, wr.CREATETIME, wr.LASTOPERATEDATE, wr.LASTOPERATETIME FROM XUGU.formtable_main_146 fm LEFT JOIN XUGU.HRMRESOURCE hrm1 ON TO_NUMBER(fm.XSRY) = hrm1.id LEFT JOIN XUGU.HRMRESOURCE hrm2 ON TO_NUMBER(fm.jfry) = hrm2.id LEFT JOIN XUGU.FORMTABLE_MAIN_146_dt1 fmd ON fmd.mainid = fm.id LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws1 ON fmd.cpmc = ws1.SELECTVALUE AND ws1.FIELDID = 14627 LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws2 ON fmd.BB = ws2.SELECTVALUE AND ws2.FIELDID = 14628 LEFT JOIN XUGU.WORKFLOW_REQUESTBASE WR ON fm.REQUESTID = WR.REQUESTID LEFT JOIN XUGU.PRJ_PROJECTINFO PP ON fm.glxm = PP.ID WHERE TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS') ORDER BY wr.CREATEDATE DESC, wr.CREATETIME DESC PARALLEL 8` var count int rows, err := sourceDB.Query(query, lastSyncTime.Format("2006-01-02 15:04:05")) if err != nil { Logger.Println("Failed to execute query on source DB:", err) return lastSyncTime, 0, err } defer rows.Close() var newSyncTime time.Time firstRecord := true for rows.Next() { var record OALicRecord count++ if err := rows.Scan( &record.OA_ID, &record.OA_REQUESTID, &record.OA_REQUESTNAME, &record.OA_REQUESTNAMENEW, &record.OA_REQUESTNAMEHTMLNEW, &record.OA_GLXMID, &record.OA_GLXMNAME, &record.OA_SQSJ, &record.OA_SALESPERSONNAME, &record.OA_OPERATIONSPERSONNAME, &record.OA_XSJSYX, &record.OA_JFJSYX, &record.OA_SYDW, &record.OA_XMXXMS, &record.OA_JDS, &record.OA_NODECOUNT, &record.OA_PRODUCTCODE, &record.OA_PRODUCTNAME, &record.OA_PRODUCTVERSION, &record.OA_CPU, &record.OA_OPERATINGSYSTEM, &record.OA_MAINMAC, &record.OA_SECONDMAC, &record.OA_CREATIONDATE, &record.OA_CREATIONTIME, &record.OA_LASTOPERATEDATE, &record.OA_LASTOPERATETIME, ); err != nil { log.Println("Failed to scan row from source DB:", err) return lastSyncTime, 0, err } record.CaptureTime = sql.NullTime{ Time: time.Now(), Valid: true, } // 插入新数据到目标数据库 err := insertOALicRecordIntoTargetDB(targetDB, record) if err != nil { Logger.Println("Failed to insert record into target DB:", err) return lastSyncTime, 0, err } //fmt.Printf("Inserted record with OA_REQUESTID %d into target DB.\n", record.OA_REQUESTID.Int32) // 在第一次迭代时更新 newSyncTime 为当前记录的创建时间 if firstRecord && record.OA_CREATIONDATE.Valid && record.OA_CREATIONTIME.Valid { crDate, _ := time.Parse("2006-01-02", record.OA_CREATIONDATE.String) crTime, _ := time.Parse("15:04:05", record.OA_CREATIONTIME.String) newSyncTime = time.Date(crDate.Year(), crDate.Month(), crDate.Day(), crTime.Hour(), crTime.Minute(), crTime.Second(), 0, time.Local) firstRecord = false } } // 如果有新数据同步,则更新 lastSyncTime if !newSyncTime.IsZero() { lastSyncTime = newSyncTime } return lastSyncTime, count, err } // insertOALicRecordIntoTargetDB 插入新记录到目标数据库 func insertOALicRecordIntoTargetDB(targetDB *sql.DB, record OALicRecord) error { //fmt.Println("插入新记录到目标数据库") tx, err := targetDB.Begin() if err != nil { return fmt.Errorf("begin transaction: %v", err) } defer func() { if err != nil { tx.Rollback() } else { err = tx.Commit() } }() uniqueID := uuid.New() sql1 := ` INSERT INTO target_OA_license ( Unique_ID, OA_ID, OA_REQUESTID, OA_REQUESTNAME, OA_REQUESTNAMENEW, OA_REQUESTNAMEHTMLNEW, OA_GLXMID, OA_GLXMNAME, OA_SQSJ, OA_SALESPERSONNAME, OA_XSJSYX, OA_OPERATIONSPERSONNAME, OA_JFJSYX, OA_SYDW, OA_XMXXMS, OA_JDS, OA_NODECOUNT, OA_PRODUCTCODE, OA_PRODUCTNAME, OA_PRODUCTVERSION, OA_CPU, OA_OPERATINGSYSTEM, OA_MAINMAC, OA_SECONDMAC, OA_CREATIONDATE, OA_CREATIONTIME, OA_LASTOPERATEDATE, OA_LASTOPERATETIME, capture_Time ) VALUES (?,?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? ,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?)` _, err = tx.Exec(sql1, uniqueID, record.OA_ID, record.OA_REQUESTID, record.OA_REQUESTNAME, record.OA_REQUESTNAMENEW, record.OA_REQUESTNAMEHTMLNEW, record.OA_GLXMID, record.OA_GLXMNAME, record.OA_SQSJ, record.OA_SALESPERSONNAME, record.OA_XSJSYX, record.OA_OPERATIONSPERSONNAME, record.OA_JFJSYX, record.OA_SYDW, record.OA_XMXXMS, record.OA_JDS, record.OA_NODECOUNT, record.OA_PRODUCTCODE, record.OA_PRODUCTNAME, record.OA_PRODUCTVERSION, record.OA_CPU, record.OA_OPERATINGSYSTEM, record.OA_MAINMAC, record.OA_SECONDMAC, record.OA_CREATIONDATE, record.OA_CREATIONTIME, record.OA_LASTOPERATEDATE, record.OA_LASTOPERATETIME, record.CaptureTime, ) if err != nil { log.Println("Failed to sql1 insert record into target DB:", err) } sql2 := ` INSERT INTO License_generate_Info ( License_UniqueID, OA_ID, License_Flage) VALUES (?, ?,?) ` _, err = tx.Exec(sql2, uniqueID, record.OA_ID, "未生成", ) if err != nil { Logger.Println("Failed to sql2 insert record into target DB:", err) return err } return nil }