123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- package capture
- import (
- "database/sql"
- "fmt"
- "log"
- "time"
- "github.com/google/uuid"
- )
- type OALicRecord struct {
- OA_ID sql.NullInt32 `json:"OA_ID"`
- OA_REQUESTID sql.NullInt32 `json:"oa_request_id"`
- OA_REQUESTNAME sql.NullString `json:"oa_request_name"`
- OA_REQUESTNAMENEW sql.NullString `json:"oa_request_name_new"`
- OA_REQUESTNAMEHTMLNEW sql.NullString `json:"oa_request_name_html_new"`
- OA_GLXMID sql.NullString `json:"oa_glxmid"`
- OA_GLXMNAME sql.NullString `json:"oa_glxmname"`
- OA_SQSJ sql.NullString `json:"oa_sqsj"`
- OA_SALESPERSONNAME sql.NullString `json:"oa_sales_person_name"`
- OA_XSJSYX sql.NullString `json:"oa_sales_email"`
- OA_OPERATIONSPERSONNAME sql.NullString `json:"oa_operations_person_name"`
- OA_JFJSYX sql.NullString `json:"oa_operations_email"`
- OA_SYDW sql.NullString `json:"oa_using_unit"`
- OA_XMXXMS sql.NullString `json:"oa_project_detail"`
- OA_JDS sql.NullInt32 `json:"oa_total_nodes"`
- OA_NODECOUNT sql.NullInt32 `json:"oa_node_count"`
- OA_PRODUCTCODE sql.NullString `json:"oa_product_code"`
- OA_PRODUCTNAME sql.NullString `json:"oa_product_name"`
- OA_PRODUCTVERSION sql.NullString `json:"oa_product_version"`
- OA_CPU sql.NullString `json:"oa_cpu"`
- OA_OPERATINGSYSTEM sql.NullString `json:"oa_operating_system"`
- OA_MAINMAC sql.NullString `json:"oa_main_mac"`
- OA_SECONDMAC sql.NullString `json:"oa_secondary_mac"`
- OA_CREATIONDATE sql.NullString `json:"oa_creation_date"`
- OA_CREATIONTIME sql.NullString `json:"oa_creation_time"`
- OA_LASTOPERATEDATE sql.NullString `json:"oa_last_operation_date"`
- OA_LASTOPERATETIME sql.NullString `json:"oa_last_operation_time"`
- CaptureTime sql.NullTime `json:"capture_time"`
- }
- func StartCapture(sourceDB, targetDB *sql.DB) {
- InitLogs("./logs/capture", "info")
- tableName := "target_OA_license"
- // 读取上次同步时间
- lastSyncTime, err := getLastSyncTime(targetDB, tableName)
- if err != nil {
- Logger.Fatal("获取上次同步时间错误 Failed to retrieve last sync time from target DB:", err)
- }
- // 启动数据同步,每隔15分钟运行一次
- for {
- Logger.Println("Start capturing data...")
- checkTemp, err := checkSourceOaLicTime(sourceDB, lastSyncTime)
- if err != nil {
- Logger.Error("Failed to checkSourceOaLicTime:", err)
- continue
- }
- if !checkTemp {
- time.Sleep(15 * time.Second)
- continue
- }
- Logger.Println("有新数据")
- lastSyncTime1, count, err := captureData(sourceDB, targetDB, lastSyncTime)
- lastSyncTime = lastSyncTime1
- if err != nil {
- Logger.Error("抓取源数据错误 Failed to capture data:", err)
- }
- if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
- Logger.Error("插入同步时间表错误 Failed to insert sync metadata into target DB:", err)
- }
- Logger.Println("插入时间: ", lastSyncTime)
- if count > 0 {
- //lic关联用户表
- err := insertLicToUser(count, targetDB)
- if err != nil {
- Logger.Error("关联用户表错误 Failed to insertLicToUser:", err)
- }
- }
- time.Sleep(15 * time.Second)
- }
- }
- func insertLicToUser(count int, targetDB *sql.DB) error {
- //查询抓取表中最新的count条数据
- query := "SELECT UNIQUE_ID,OA_SALESPERSONNAME, OA_OPERATIONSPERSONNAME FROM target_OA_license ORDER BY capture_time DESC LIMIT ?"
- rows, err := targetDB.Query(query, count)
- if err != nil {
- Logger.Println("查询源库最新数据错误 Failed to execute query on source DB:", err)
- return err
- }
- defer rows.Close()
- Logger.Println("查询到最新的count条数据 ", count)
- for rows.Next() {
- var licUniqueID, salesPersonName, operationsPersonName sql.NullString
- err := rows.Scan(&licUniqueID, &salesPersonName, &operationsPersonName)
- if err != nil {
- Logger.Error("插入到目标库错误 insertLicToUser find newdata Failed to scan row:", err)
- continue
- }
- if salesPersonName.Valid || operationsPersonName.Valid != false {
- //将获取到的字段,和用户表关联对比
- var userUNIQUEID, userACCOUNT sql.NullString
- queryLIC_USER := `SELECT UNIQUEID, ACCOUNT FROM LIC_USER where
- (USERNAME = ? OR USERNAME = ? )
- and (role = 'ADMIN' OR role = 'supportRole' OR role = 'salesRole');`
- err = targetDB.QueryRow(queryLIC_USER, salesPersonName, operationsPersonName).Scan(&userUNIQUEID, &userACCOUNT)
- if err == sql.ErrNoRows {
- Logger.Warn("No rows were returned")
- continue
- }
- if err != nil {
- Logger.Error("find user name Failed to scan row :", err)
- continue
- }
- //fmt.Println("关联到用户 ", userUNIQUEID, userACCOUNT)
- insertQuery := "INSERT INTO LICENSERECORDTOUSER(LICENSE_UNIQUEID,USER_UNIQUEID,USER_ACCOUNT,OPERATOR_UNIQUEID,UP_TIME) VALUES(?,?,?,'SYSTEM_AUTO',?)"
- _, err = targetDB.Exec(insertQuery,
- licUniqueID,
- userUNIQUEID,
- userACCOUNT,
- time.Now(),
- )
- if err != nil {
- Logger.Error("lic关联到目标库用户错误 Failed to sql2 insert record into target DB:", err)
- }
- }
- }
- return nil
- }
- // CaptureOnce 主动触发一次数据抓取
- func CaptureOnce(sourceDB, targetDB *sql.DB) error {
- tableName := "target_OA_license"
- // 读取上次同步时间
- lastSyncTime, err := getLastSyncTime(targetDB, tableName)
- if err != nil {
- Logger.Error("Failed to retrieve last sync time from target DB:", err)
- return err
- }
- // 执行一次数据抓取
- lastSyncTime, count, err := captureData(sourceDB, targetDB, lastSyncTime)
- if err != nil {
- Logger.Error("Failed to capture data:", err)
- return err
- }
- if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
- Logger.Error("Failed to insert sync metadata into target DB:", err)
- return err
- }
- Logger.Println("Captured", count, "records.")
- if count > 0 {
- //lic关联用户表
- err := insertLicToUser(count, targetDB)
- if err != nil {
- Logger.Error("insertLicToUser Failed to insert record into target DB:", err)
- return err
- }
- }
- return nil
- }
- // getLastSyncTime 从数据库中获取指定表的上次同步时间
- func getLastSyncTime(db *sql.DB, tableName string) (time.Time, error) {
- var lastSyncTime time.Time
- query := `
- SELECT last_sync_time
- FROM sync_metadata
- WHERE table_name = ?
- ORDER BY last_sync_time DESC
- LIMIT 1
- `
- err := db.QueryRow(query, tableName).Scan(&lastSyncTime)
- if err != nil {
- return time.Time{}, err
- }
- return lastSyncTime, nil
- }
- // insertSyncMetadata 插入新的同步元数据
- func insertSyncMetadata(db *sql.DB, tableName string, lastSyncTime time.Time) error {
- query := `
- INSERT INTO sync_metadata (table_name, last_sync_time)
- VALUES (?, ?)
- `
- _, err := db.Exec(query, tableName, lastSyncTime)
- return err
- }
- func checkSourceOaLicTime(sourceDB *sql.DB, lastSyncTime time.Time) (bool, error) {
- var checkCount int
- checkTime := `SELECT count(1) FROM XUGU.WORKFLOW_REQUESTBASE WR
- where
- TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS');`
- error := sourceDB.QueryRow(checkTime, lastSyncTime).Scan(&checkCount)
- if error != nil {
- Logger.Println("Failed to check count:", error)
- return false, error
- }
- if checkCount == 0 {
- Logger.Println("没有新数据需要同步")
- return false, nil
- }
- return true, nil
- }
- // captureData 执行数据抓取和同步
- func captureData(sourceDB, targetDB *sql.DB, lastSyncTime time.Time) (time.Time, int, error) {
- query := `
- SELECT
- fmd.id ,
- fm.REQUESTID,
- wr.REQUESTNAME,
- wr.REQUESTNAMENEW,
- wr.REQUESTNAMEHTMLNEW,
- fm.glxm,
- PP.NAME,
- fm.SQSJ,
- hrm1.LASTNAME,
- hrm2.LASTNAME,
- fm.XSJSYX,
- fm.JFJSYX,
- fm.SYDW,
- fm.XMXXMS,
- fm.JDS,
- fmd.JDS,
- ws1.SELECTVALUE,
- ws1.SELECTNAME,
- ws2.SELECTNAME,
- fmd.CLQ,
- fmd.CZXT,
- fmd.IP,
- fmd.MAC,
- wr.CREATEDATE,
- wr.CREATETIME,
- wr.LASTOPERATEDATE,
- wr.LASTOPERATETIME
- FROM XUGU.formtable_main_146 fm
- LEFT JOIN XUGU.HRMRESOURCE hrm1 ON TO_NUMBER(fm.XSRY) = hrm1.id
- LEFT JOIN XUGU.HRMRESOURCE hrm2 ON TO_NUMBER(fm.jfry) = hrm2.id
- LEFT JOIN XUGU.FORMTABLE_MAIN_146_dt1 fmd ON fmd.mainid = fm.id
- LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws1 ON fmd.cpmc = ws1.SELECTVALUE AND ws1.FIELDID = 14627
- LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws2 ON fmd.BB = ws2.SELECTVALUE AND ws2.FIELDID = 14628
- LEFT JOIN XUGU.WORKFLOW_REQUESTBASE WR ON fm.REQUESTID = WR.REQUESTID
- LEFT JOIN XUGU.PRJ_PROJECTINFO PP ON fm.glxm = PP.ID
- WHERE TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS')
- ORDER BY wr.CREATEDATE DESC, wr.CREATETIME DESC
- PARALLEL 8`
- var count int
- rows, err := sourceDB.Query(query, lastSyncTime.Format("2006-01-02 15:04:05"))
- if err != nil {
- Logger.Println("Failed to execute query on source DB:", err)
- return lastSyncTime, 0, err
- }
- defer rows.Close()
- var newSyncTime time.Time
- firstRecord := true
- for rows.Next() {
- var record OALicRecord
- count++
- if err := rows.Scan(
- &record.OA_ID,
- &record.OA_REQUESTID,
- &record.OA_REQUESTNAME,
- &record.OA_REQUESTNAMENEW,
- &record.OA_REQUESTNAMEHTMLNEW,
- &record.OA_GLXMID,
- &record.OA_GLXMNAME,
- &record.OA_SQSJ,
- &record.OA_SALESPERSONNAME,
- &record.OA_OPERATIONSPERSONNAME,
- &record.OA_XSJSYX,
- &record.OA_JFJSYX,
- &record.OA_SYDW,
- &record.OA_XMXXMS,
- &record.OA_JDS,
- &record.OA_NODECOUNT,
- &record.OA_PRODUCTCODE,
- &record.OA_PRODUCTNAME,
- &record.OA_PRODUCTVERSION,
- &record.OA_CPU,
- &record.OA_OPERATINGSYSTEM,
- &record.OA_MAINMAC,
- &record.OA_SECONDMAC,
- &record.OA_CREATIONDATE,
- &record.OA_CREATIONTIME,
- &record.OA_LASTOPERATEDATE,
- &record.OA_LASTOPERATETIME,
- ); err != nil {
- log.Println("Failed to scan row from source DB:", err)
- return lastSyncTime, 0, err
- }
- record.CaptureTime = sql.NullTime{
- Time: time.Now(),
- Valid: true,
- }
- // 插入新数据到目标数据库
- err := insertOALicRecordIntoTargetDB(targetDB, record)
- if err != nil {
- Logger.Println("Failed to insert record into target DB:", err)
- return lastSyncTime, 0, err
- }
- //fmt.Printf("Inserted record with OA_REQUESTID %d into target DB.\n", record.OA_REQUESTID.Int32)
- // 在第一次迭代时更新 newSyncTime 为当前记录的创建时间
- if firstRecord && record.OA_CREATIONDATE.Valid && record.OA_CREATIONTIME.Valid {
- crDate, _ := time.Parse("2006-01-02", record.OA_CREATIONDATE.String)
- crTime, _ := time.Parse("15:04:05", record.OA_CREATIONTIME.String)
- newSyncTime = time.Date(crDate.Year(), crDate.Month(), crDate.Day(), crTime.Hour(), crTime.Minute(), crTime.Second(), 0, time.Local)
- firstRecord = false
- }
- }
- // 如果有新数据同步,则更新 lastSyncTime
- if !newSyncTime.IsZero() {
- lastSyncTime = newSyncTime
- }
- return lastSyncTime, count, err
- }
- // insertOALicRecordIntoTargetDB 插入新记录到目标数据库
- func insertOALicRecordIntoTargetDB(targetDB *sql.DB, record OALicRecord) error {
- //fmt.Println("插入新记录到目标数据库")
- tx, err := targetDB.Begin()
- if err != nil {
- return fmt.Errorf("begin transaction: %v", err)
- }
- defer func() {
- if err != nil {
- tx.Rollback()
- } else {
- err = tx.Commit()
- }
- }()
- uniqueID := uuid.New()
- sql1 := `
- INSERT INTO target_OA_license (
- Unique_ID,
- OA_ID,
- OA_REQUESTID,
- OA_REQUESTNAME,
- OA_REQUESTNAMENEW,
- OA_REQUESTNAMEHTMLNEW,
- OA_GLXMID,
- OA_GLXMNAME,
- OA_SQSJ,
- OA_SALESPERSONNAME,
- OA_XSJSYX,
- OA_OPERATIONSPERSONNAME,
- OA_JFJSYX,
- OA_SYDW,
- OA_XMXXMS,
- OA_JDS,
- OA_NODECOUNT,
- OA_PRODUCTCODE,
- OA_PRODUCTNAME,
- OA_PRODUCTVERSION,
- OA_CPU,
- OA_OPERATINGSYSTEM,
- OA_MAINMAC,
- OA_SECONDMAC,
- OA_CREATIONDATE,
- OA_CREATIONTIME,
- OA_LASTOPERATEDATE,
- OA_LASTOPERATETIME,
- capture_Time
- ) VALUES (?,?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? ,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?)`
- _, err = tx.Exec(sql1,
- uniqueID,
- record.OA_ID,
- record.OA_REQUESTID,
- record.OA_REQUESTNAME,
- record.OA_REQUESTNAMENEW,
- record.OA_REQUESTNAMEHTMLNEW,
- record.OA_GLXMID,
- record.OA_GLXMNAME,
- record.OA_SQSJ,
- record.OA_SALESPERSONNAME,
- record.OA_XSJSYX,
- record.OA_OPERATIONSPERSONNAME,
- record.OA_JFJSYX,
- record.OA_SYDW,
- record.OA_XMXXMS,
- record.OA_JDS,
- record.OA_NODECOUNT,
- record.OA_PRODUCTCODE,
- record.OA_PRODUCTNAME,
- record.OA_PRODUCTVERSION,
- record.OA_CPU,
- record.OA_OPERATINGSYSTEM,
- record.OA_MAINMAC,
- record.OA_SECONDMAC,
- record.OA_CREATIONDATE,
- record.OA_CREATIONTIME,
- record.OA_LASTOPERATEDATE,
- record.OA_LASTOPERATETIME,
- record.CaptureTime,
- )
- if err != nil {
- log.Println("Failed to sql1 insert record into target DB:", err)
- }
- sql2 := `
- INSERT INTO License_generate_Info (
- License_UniqueID,
- OA_ID,
- License_Flage)
- VALUES (?, ?,?)
- `
- _, err = tx.Exec(sql2,
- uniqueID,
- record.OA_ID,
- "未生成",
- )
- if err != nil {
- Logger.Println("Failed to sql2 insert record into target DB:", err)
- return err
- }
- return nil
- }
|