capture.go 13 KB


  1. package capture
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/google/uuid"
  8. )
  9. type OALicRecord struct {
  10. OA_ID sql.NullInt32 `json:"OA_ID"`
  11. OA_REQUESTID sql.NullInt32 `json:"oa_request_id"`
  12. OA_REQUESTNAME sql.NullString `json:"oa_request_name"`
  13. OA_REQUESTNAMENEW sql.NullString `json:"oa_request_name_new"`
  14. OA_REQUESTNAMEHTMLNEW sql.NullString `json:"oa_request_name_html_new"`
  15. OA_GLXMID sql.NullString `json:"oa_glxmid"`
  16. OA_GLXMNAME sql.NullString `json:"oa_glxmname"`
  17. OA_SQSJ sql.NullString `json:"oa_sqsj"`
  18. OA_SALESPERSONNAME sql.NullString `json:"oa_sales_person_name"`
  19. OA_XSJSYX sql.NullString `json:"oa_sales_email"`
  20. OA_OPERATIONSPERSONNAME sql.NullString `json:"oa_operations_person_name"`
  21. OA_JFJSYX sql.NullString `json:"oa_operations_email"`
  22. OA_SYDW sql.NullString `json:"oa_using_unit"`
  23. OA_XMXXMS sql.NullString `json:"oa_project_detail"`
  24. OA_JDS sql.NullInt32 `json:"oa_total_nodes"`
  25. OA_NODECOUNT sql.NullInt32 `json:"oa_node_count"`
  26. OA_PRODUCTCODE sql.NullString `json:"oa_product_code"`
  27. OA_PRODUCTNAME sql.NullString `json:"oa_product_name"`
  28. OA_PRODUCTVERSION sql.NullString `json:"oa_product_version"`
  29. OA_CPU sql.NullString `json:"oa_cpu"`
  30. OA_OPERATINGSYSTEM sql.NullString `json:"oa_operating_system"`
  31. OA_MAINMAC sql.NullString `json:"oa_main_mac"`
  32. OA_SECONDMAC sql.NullString `json:"oa_secondary_mac"`
  33. OA_CREATIONDATE sql.NullString `json:"oa_creation_date"`
  34. OA_CREATIONTIME sql.NullString `json:"oa_creation_time"`
  35. OA_LASTOPERATEDATE sql.NullString `json:"oa_last_operation_date"`
  36. OA_LASTOPERATETIME sql.NullString `json:"oa_last_operation_time"`
  37. CaptureTime sql.NullTime `json:"capture_time"`
  38. }
  39. func StartCapture(sourceDB, targetDB *sql.DB) {
  40. InitLogs("./logs/capture", "info")
  41. tableName := "target_OA_license"
  42. // 读取上次同步时间
  43. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  44. if err != nil {
  45. Logger.Fatal("获取上次同步时间错误 Failed to retrieve last sync time from target DB:", err)
  46. }
  47. // 启动数据同步,每隔15分钟运行一次
  48. for {
  49. //Logger.Println("Start capturing data...")
  50. checkTemp, err := checkSourceOaLicTime(sourceDB, lastSyncTime)
  51. if err != nil {
  52. Logger.Error("Failed to checkSourceOaLicTime:", err)
  53. continue
  54. }
  55. if !checkTemp {
  56. time.Sleep(15 * time.Second)
  57. continue
  58. }
  59. Logger.Println("有新数据")
  60. lastSyncTime1, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  61. lastSyncTime = lastSyncTime1
  62. if err != nil {
  63. Logger.Error("抓取源数据错误 Failed to capture data:", err)
  64. }
  65. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  66. Logger.Error("插入同步时间表错误 Failed to insert sync metadata into target DB:", err)
  67. }
  68. Logger.Println("插入时间: ", lastSyncTime)
  69. if count > 0 {
  70. //lic关联用户表
  71. err := insertLicToUser(count, targetDB)
  72. if err != nil {
  73. Logger.Error("关联用户表错误 Failed to insertLicToUser:", err)
  74. }
  75. }
  76. time.Sleep(15 * time.Second)
  77. }
  78. }
  79. func insertLicToUser(count int, targetDB *sql.DB) error {
  80. //查询抓取表中最新的count条数据
  81. query := "SELECT OA_REQUESTID,UNIQUE_ID, OA_OPERATIONSPERSONNAME FROM target_OA_license ORDER BY capture_time DESC LIMIT ?"
  82. rows, err := targetDB.Query(query, count)
  83. if err != nil {
  84. Logger.Println("查询源库最新数据错误 Failed to execute query on source DB:", err)
  85. return err
  86. }
  87. defer rows.Close()
  88. Logger.Println("查询到最新的count条数据 ", count)
  89. for rows.Next() {
  90. var OARequestID, licUniqueID, operationsPersonName sql.NullString
  91. err := rows.Scan(&OARequestID, &licUniqueID, &operationsPersonName)
  92. if err != nil {
  93. Logger.Error("插入到目标库错误 insertLicToUser find newdata Failed to scan row:", err)
  94. continue
  95. }
  96. if operationsPersonName.Valid != false {
  97. //将获取到的字段,和用户表关联对比
  98. var userUNIQUEID, userACCOUNT sql.NullString
  99. queryLIC_USER := `SELECT UNIQUEID, ACCOUNT FROM LIC_USER where
  100. (USERNAME = ? )
  101. and ( role = 'supportRole' );`
  102. err = targetDB.QueryRow(queryLIC_USER, operationsPersonName).Scan(&userUNIQUEID, &userACCOUNT)
  103. if err == sql.ErrNoRows {
  104. Logger.Warn("No rows were returned")
  105. continue
  106. }
  107. if err != nil {
  108. Logger.Error("find user name Failed to scan row :", err)
  109. continue
  110. }
  111. if userUNIQUEID.Valid && userACCOUNT.Valid {
  112. //fmt.Println("关联到用户 ", userUNIQUEID, userACCOUNT)
  113. insertQuery := "INSERT INTO LICENSERECORDTOUSER(OA_REQUESTID,LICENSE_UNIQUEID,USER_UNIQUEID,USER_ACCOUNT,OPERATOR_UNIQUEID,UP_TIME) VALUES(?,?,?,?,'SYSTEM_AUTO',?)"
  114. _, err = targetDB.Exec(insertQuery,
  115. OARequestID,
  116. licUniqueID,
  117. userUNIQUEID,
  118. userACCOUNT,
  119. time.Now(),
  120. )
  121. if err != nil {
  122. Logger.Error("lic关联到目标库用户错误 Failed to sql2 insert record into target DB:", err)
  123. }
  124. }
  125. }
  126. }
  127. return nil
  128. }
  129. // CaptureOnce 主动触发一次数据抓取
  130. func CaptureOnce(sourceDB, targetDB *sql.DB) error {
  131. tableName := "target_OA_license"
  132. // 读取上次同步时间
  133. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  134. if err != nil {
  135. Logger.Error("Failed to retrieve last sync time from target DB:", err)
  136. return err
  137. }
  138. // 执行一次数据抓取
  139. lastSyncTime, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  140. if err != nil {
  141. Logger.Error("Failed to capture data:", err)
  142. return err
  143. }
  144. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  145. Logger.Error("Failed to insert sync metadata into target DB:", err)
  146. return err
  147. }
  148. Logger.Println("Captured", count, "records.")
  149. if count > 0 {
  150. //lic关联用户表
  151. err := insertLicToUser(count, targetDB)
  152. if err != nil {
  153. Logger.Error("insertLicToUser Failed to insert record into target DB:", err)
  154. return err
  155. }
  156. }
  157. return nil
  158. }
  159. // getLastSyncTime 从数据库中获取指定表的上次同步时间
  160. func getLastSyncTime(db *sql.DB, tableName string) (time.Time, error) {
  161. var lastSyncTime time.Time
  162. query := `
  163. SELECT last_sync_time
  164. FROM sync_metadata
  165. WHERE table_name = ?
  166. ORDER BY last_sync_time DESC
  167. LIMIT 1
  168. `
  169. err := db.QueryRow(query, tableName).Scan(&lastSyncTime)
  170. if err != nil {
  171. return time.Time{}, err
  172. }
  173. return lastSyncTime, nil
  174. }
  175. // insertSyncMetadata 插入新的同步元数据
  176. func insertSyncMetadata(db *sql.DB, tableName string, lastSyncTime time.Time) error {
  177. query := `
  178. INSERT INTO sync_metadata (table_name, last_sync_time)
  179. VALUES (?, ?)
  180. `
  181. _, err := db.Exec(query, tableName, lastSyncTime)
  182. return err
  183. }
  184. func checkSourceOaLicTime(sourceDB *sql.DB, lastSyncTime time.Time) (bool, error) {
  185. var checkCount int
  186. checkTime := `SELECT count(1) FROM XUGU.WORKFLOW_REQUESTBASE WR
  187. where
  188. TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS');`
  189. error := sourceDB.QueryRow(checkTime, lastSyncTime).Scan(&checkCount)
  190. if error != nil {
  191. Logger.Println("Failed to check count:", error)
  192. return false, error
  193. }
  194. if checkCount == 0 {
  195. //Logger.Println("没有新数据需要同步")
  196. return false, nil
  197. }
  198. return true, nil
  199. }
  200. // captureData 执行数据抓取和同步
  201. func captureData(sourceDB, targetDB *sql.DB, lastSyncTime time.Time) (time.Time, int, error) {
  202. query := `
  203. SELECT
  204. fmd.id ,
  205. fm.REQUESTID,
  206. wr.REQUESTNAME,
  207. wr.REQUESTNAMENEW,
  208. wr.REQUESTNAMEHTMLNEW,
  209. fm.glxm,
  210. PP.NAME,
  211. fm.SQSJ,
  212. hrm1.LASTNAME,
  213. hrm2.LASTNAME,
  214. fm.XSJSYX,
  215. fm.JFJSYX,
  216. fm.SYDW,
  217. fm.XMXXMS,
  218. fm.JDS,
  219. fmd.JDS,
  220. ws1.SELECTVALUE,
  221. ws1.SELECTNAME,
  222. ws2.SELECTNAME,
  223. fmd.CLQ,
  224. fmd.CZXT,
  225. fmd.IP,
  226. fmd.MAC,
  227. wr.CREATEDATE,
  228. wr.CREATETIME,
  229. wr.LASTOPERATEDATE,
  230. wr.LASTOPERATETIME
  231. FROM XUGU.formtable_main_146 fm
  232. LEFT JOIN XUGU.HRMRESOURCE hrm1 ON TO_NUMBER(fm.XSRY) = hrm1.id
  233. LEFT JOIN XUGU.HRMRESOURCE hrm2 ON TO_NUMBER(fm.jfry) = hrm2.id
  234. LEFT JOIN XUGU.FORMTABLE_MAIN_146_dt1 fmd ON fmd.mainid = fm.id
  235. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws1 ON fmd.cpmc = ws1.SELECTVALUE AND ws1.FIELDID = 14627
  236. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws2 ON fmd.BB = ws2.SELECTVALUE AND ws2.FIELDID = 14628
  237. LEFT JOIN XUGU.WORKFLOW_REQUESTBASE WR ON fm.REQUESTID = WR.REQUESTID
  238. LEFT JOIN XUGU.PRJ_PROJECTINFO PP ON fm.glxm = PP.ID
  239. WHERE TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS')
  240. ORDER BY wr.CREATEDATE DESC, wr.CREATETIME DESC
  241. PARALLEL 8`
  242. var count int
  243. rows, err := sourceDB.Query(query, lastSyncTime.Format("2006-01-02 15:04:05"))
  244. if err != nil {
  245. Logger.Println("Failed to execute query on source DB:", err)
  246. return lastSyncTime, 0, err
  247. }
  248. defer rows.Close()
  249. var newSyncTime time.Time
  250. firstRecord := true
  251. for rows.Next() {
  252. var record OALicRecord
  253. count++
  254. if err := rows.Scan(
  255. &record.OA_ID,
  256. &record.OA_REQUESTID,
  257. &record.OA_REQUESTNAME,
  258. &record.OA_REQUESTNAMENEW,
  259. &record.OA_REQUESTNAMEHTMLNEW,
  260. &record.OA_GLXMID,
  261. &record.OA_GLXMNAME,
  262. &record.OA_SQSJ,
  263. &record.OA_SALESPERSONNAME,
  264. &record.OA_OPERATIONSPERSONNAME,
  265. &record.OA_XSJSYX,
  266. &record.OA_JFJSYX,
  267. &record.OA_SYDW,
  268. &record.OA_XMXXMS,
  269. &record.OA_JDS,
  270. &record.OA_NODECOUNT,
  271. &record.OA_PRODUCTCODE,
  272. &record.OA_PRODUCTNAME,
  273. &record.OA_PRODUCTVERSION,
  274. &record.OA_CPU,
  275. &record.OA_OPERATINGSYSTEM,
  276. &record.OA_MAINMAC,
  277. &record.OA_SECONDMAC,
  278. &record.OA_CREATIONDATE,
  279. &record.OA_CREATIONTIME,
  280. &record.OA_LASTOPERATEDATE,
  281. &record.OA_LASTOPERATETIME,
  282. ); err != nil {
  283. log.Println("Failed to scan row from source DB:", err)
  284. return lastSyncTime, 0, err
  285. }
  286. record.CaptureTime = sql.NullTime{
  287. Time: time.Now(),
  288. Valid: true,
  289. }
  290. // 插入新数据到目标数据库
  291. err := insertOALicRecordIntoTargetDB(targetDB, record)
  292. if err != nil {
  293. Logger.Println("Failed to insert record into target DB:", err)
  294. return lastSyncTime, 0, err
  295. }
  296. //fmt.Printf("Inserted record with OA_REQUESTID %d into target DB.\n", record.OA_REQUESTID.Int32)
  297. // 在第一次迭代时更新 newSyncTime 为当前记录的创建时间
  298. if firstRecord && record.OA_CREATIONDATE.Valid && record.OA_CREATIONTIME.Valid {
  299. crDate, _ := time.Parse("2006-01-02", record.OA_CREATIONDATE.String)
  300. crTime, _ := time.Parse("15:04:05", record.OA_CREATIONTIME.String)
  301. newSyncTime = time.Date(crDate.Year(), crDate.Month(), crDate.Day(), crTime.Hour(), crTime.Minute(), crTime.Second(), 0, time.Local)
  302. firstRecord = false
  303. }
  304. }
  305. // 如果有新数据同步,则更新 lastSyncTime
  306. if !newSyncTime.IsZero() {
  307. lastSyncTime = newSyncTime
  308. }
  309. return lastSyncTime, count, err
  310. }
  311. // insertOALicRecordIntoTargetDB 插入新记录到目标数据库
  312. func insertOALicRecordIntoTargetDB(targetDB *sql.DB, record OALicRecord) error {
  313. //fmt.Println("插入新记录到目标数据库")
  314. tx, err := targetDB.Begin()
  315. if err != nil {
  316. return fmt.Errorf("begin transaction: %v", err)
  317. }
  318. defer func() {
  319. if err != nil {
  320. tx.Rollback()
  321. } else {
  322. err = tx.Commit()
  323. }
  324. }()
  325. uniqueID := uuid.New()
  326. sql1 := `
  327. INSERT INTO target_OA_license (
  328. Unique_ID,
  329. OA_ID,
  330. OA_REQUESTID,
  331. OA_REQUESTNAME,
  332. OA_REQUESTNAMENEW,
  333. OA_REQUESTNAMEHTMLNEW,
  334. OA_GLXMID,
  335. OA_GLXMNAME,
  336. OA_SQSJ,
  337. OA_SALESPERSONNAME,
  338. OA_XSJSYX,
  339. OA_OPERATIONSPERSONNAME,
  340. OA_JFJSYX,
  341. OA_SYDW,
  342. OA_XMXXMS,
  343. OA_JDS,
  344. OA_NODECOUNT,
  345. OA_PRODUCTCODE,
  346. OA_PRODUCTNAME,
  347. OA_PRODUCTVERSION,
  348. OA_CPU,
  349. OA_OPERATINGSYSTEM,
  350. OA_MAINMAC,
  351. OA_SECONDMAC,
  352. OA_CREATIONDATE,
  353. OA_CREATIONTIME,
  354. OA_LASTOPERATEDATE,
  355. OA_LASTOPERATETIME,
  356. capture_Time
  357. ) VALUES (?,?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? ,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?)`
  358. _, err = tx.Exec(sql1,
  359. uniqueID,
  360. record.OA_ID,
  361. record.OA_REQUESTID,
  362. record.OA_REQUESTNAME,
  363. record.OA_REQUESTNAMENEW,
  364. record.OA_REQUESTNAMEHTMLNEW,
  365. record.OA_GLXMID,
  366. record.OA_GLXMNAME,
  367. record.OA_SQSJ,
  368. record.OA_SALESPERSONNAME,
  369. record.OA_XSJSYX,
  370. record.OA_OPERATIONSPERSONNAME,
  371. record.OA_JFJSYX,
  372. record.OA_SYDW,
  373. record.OA_XMXXMS,
  374. record.OA_JDS,
  375. record.OA_NODECOUNT,
  376. record.OA_PRODUCTCODE,
  377. record.OA_PRODUCTNAME,
  378. record.OA_PRODUCTVERSION,
  379. record.OA_CPU,
  380. record.OA_OPERATINGSYSTEM,
  381. record.OA_MAINMAC,
  382. record.OA_SECONDMAC,
  383. record.OA_CREATIONDATE,
  384. record.OA_CREATIONTIME,
  385. record.OA_LASTOPERATEDATE,
  386. record.OA_LASTOPERATETIME,
  387. record.CaptureTime,
  388. )
  389. if err != nil {
  390. log.Println("Failed to sql1 insert record into target DB:", err)
  391. }
  392. sql2 := `
  393. INSERT INTO License_generate_Info (
  394. License_UniqueID,
  395. OA_ID,
  396. License_Flage)
  397. VALUES (?, ?,?)
  398. `
  399. _, err = tx.Exec(sql2,
  400. uniqueID,
  401. record.OA_ID,
  402. "未生成",
  403. )
  404. if err != nil {
  405. Logger.Println("Failed to sql2 insert record into target DB:", err)
  406. return err
  407. }
  408. return nil
  409. }