capture.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package capture
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/google/uuid"
  8. )
  9. type OALicRecord struct {
  10. OA_ID sql.NullInt32 `json:"OA_ID"`
  11. OA_REQUESTID sql.NullInt32 `json:"oa_request_id"`
  12. OA_REQUESTNAME sql.NullString `json:"oa_request_name"`
  13. OA_REQUESTNAMENEW sql.NullString `json:"oa_request_name_new"`
  14. OA_REQUESTNAMEHTMLNEW sql.NullString `json:"oa_request_name_html_new"`
  15. OA_GLXMID sql.NullString `json:"oa_glxmid"`
  16. OA_GLXMNAME sql.NullString `json:"oa_glxmname"`
  17. OA_SQSJ sql.NullString `json:"oa_sqsj"`
  18. OA_SALESPERSONNAME sql.NullString `json:"oa_sales_person_name"`
  19. OA_XSJSYX sql.NullString `json:"oa_sales_email"`
  20. OA_OPERATIONSPERSONNAME sql.NullString `json:"oa_operations_person_name"`
  21. OA_JFJSYX sql.NullString `json:"oa_operations_email"`
  22. OA_SYDW sql.NullString `json:"oa_using_unit"`
  23. OA_XMXXMS sql.NullString `json:"oa_project_detail"`
  24. OA_JDS sql.NullInt32 `json:"oa_total_nodes"`
  25. OA_NODECOUNT sql.NullInt32 `json:"oa_node_count"`
  26. OA_PRODUCTCODE sql.NullString `json:"oa_product_code"`
  27. OA_PRODUCTNAME sql.NullString `json:"oa_product_name"`
  28. OA_PRODUCTVERSION sql.NullString `json:"oa_product_version"`
  29. OA_CPU sql.NullString `json:"oa_cpu"`
  30. OA_OPERATINGSYSTEM sql.NullString `json:"oa_operating_system"`
  31. OA_MAINMAC sql.NullString `json:"oa_main_mac"`
  32. OA_SECONDMAC sql.NullString `json:"oa_secondary_mac"`
  33. OA_CREATIONDATE sql.NullString `json:"oa_creation_date"`
  34. OA_CREATIONTIME sql.NullString `json:"oa_creation_time"`
  35. OA_LASTOPERATEDATE sql.NullString `json:"oa_last_operation_date"`
  36. OA_LASTOPERATETIME sql.NullString `json:"oa_last_operation_time"`
  37. CaptureTime sql.NullTime `json:"capture_time"`
  38. }
  39. func StartCapture(sourceDB, targetDB *sql.DB) {
  40. InitLogs("./logs/capture", "info")
  41. tableName := "target_OA_license"
  42. // 读取上次同步时间
  43. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  44. if err != nil {
  45. Logger.Fatal("获取上次同步时间错误 Failed to retrieve last sync time from target DB:", err)
  46. }
  47. // 启动数据同步,每隔15分钟运行一次
  48. for {
  49. Logger.Println("Start capturing data...")
  50. checkTemp, err := checkSourceOaLicTime(sourceDB, lastSyncTime)
  51. if err != nil {
  52. Logger.Error("Failed to checkSourceOaLicTime:", err)
  53. continue
  54. }
  55. if !checkTemp {
  56. time.Sleep(15 * time.Second)
  57. continue
  58. }
  59. Logger.Println("有新数据")
  60. lastSyncTime1, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  61. lastSyncTime = lastSyncTime1
  62. if err != nil {
  63. Logger.Error("抓取源数据错误 Failed to capture data:", err)
  64. }
  65. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  66. Logger.Error("插入同步时间表错误 Failed to insert sync metadata into target DB:", err)
  67. }
  68. Logger.Println("插入时间: ", lastSyncTime)
  69. if count > 0 {
  70. //lic关联用户表
  71. err := insertLicToUser(count, targetDB)
  72. if err != nil {
  73. Logger.Error("关联用户表错误 Failed to insertLicToUser:", err)
  74. }
  75. }
  76. time.Sleep(15 * time.Second)
  77. }
  78. }
  79. func insertLicToUser(count int, targetDB *sql.DB) error {
  80. //查询抓取表中最新的count条数据
  81. query := "SELECT UNIQUE_ID,OA_SALESPERSONNAME, OA_OPERATIONSPERSONNAME FROM target_OA_license ORDER BY capture_time DESC LIMIT ?"
  82. rows, err := targetDB.Query(query, count)
  83. if err != nil {
  84. Logger.Println("查询源库最新数据错误 Failed to execute query on source DB:", err)
  85. return err
  86. }
  87. defer rows.Close()
  88. Logger.Println("查询到最新的count条数据 ", count)
  89. for rows.Next() {
  90. var licUniqueID, salesPersonName, operationsPersonName sql.NullString
  91. err := rows.Scan(&licUniqueID, &salesPersonName, &operationsPersonName)
  92. if err != nil {
  93. Logger.Error("插入到目标库错误 insertLicToUser find newdata Failed to scan row:", err)
  94. continue
  95. }
  96. if salesPersonName.Valid || operationsPersonName.Valid != false {
  97. //将获取到的字段,和用户表关联对比
  98. var userUNIQUEID, userACCOUNT sql.NullString
  99. queryLIC_USER := `SELECT UNIQUEID, ACCOUNT FROM LIC_USER where
  100. (USERNAME = ? OR USERNAME = ? )
  101. and (role = 'ADMIN' OR role = 'supportRole' OR role = 'salesRole');`
  102. err = targetDB.QueryRow(queryLIC_USER, salesPersonName, operationsPersonName).Scan(&userUNIQUEID, &userACCOUNT)
  103. if err == sql.ErrNoRows {
  104. Logger.Warn("No rows were returned")
  105. continue
  106. }
  107. if err != nil {
  108. Logger.Error("find user name Failed to scan row :", err)
  109. continue
  110. }
  111. //fmt.Println("关联到用户 ", userUNIQUEID, userACCOUNT)
  112. insertQuery := "INSERT INTO LICENSERECORDTOUSER(LICENSE_UNIQUEID,USER_UNIQUEID,USER_ACCOUNT,OPERATOR_UNIQUEID,UP_TIME) VALUES(?,?,?,'SYSTEM_AUTO',?)"
  113. _, err = targetDB.Exec(insertQuery,
  114. licUniqueID,
  115. userUNIQUEID,
  116. userACCOUNT,
  117. time.Now(),
  118. )
  119. if err != nil {
  120. Logger.Error("lic关联到目标库用户错误 Failed to sql2 insert record into target DB:", err)
  121. }
  122. }
  123. }
  124. return nil
  125. }
  126. // CaptureOnce 主动触发一次数据抓取
  127. func CaptureOnce(sourceDB, targetDB *sql.DB) error {
  128. tableName := "target_OA_license"
  129. // 读取上次同步时间
  130. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  131. if err != nil {
  132. Logger.Error("Failed to retrieve last sync time from target DB:", err)
  133. return err
  134. }
  135. // 执行一次数据抓取
  136. lastSyncTime, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  137. if err != nil {
  138. Logger.Error("Failed to capture data:", err)
  139. return err
  140. }
  141. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  142. Logger.Error("Failed to insert sync metadata into target DB:", err)
  143. return err
  144. }
  145. Logger.Println("Captured", count, "records.")
  146. if count > 0 {
  147. //lic关联用户表
  148. err := insertLicToUser(count, targetDB)
  149. if err != nil {
  150. Logger.Error("insertLicToUser Failed to insert record into target DB:", err)
  151. return err
  152. }
  153. }
  154. return nil
  155. }
  156. // getLastSyncTime 从数据库中获取指定表的上次同步时间
  157. func getLastSyncTime(db *sql.DB, tableName string) (time.Time, error) {
  158. var lastSyncTime time.Time
  159. query := `
  160. SELECT last_sync_time
  161. FROM sync_metadata
  162. WHERE table_name = ?
  163. ORDER BY last_sync_time DESC
  164. LIMIT 1
  165. `
  166. err := db.QueryRow(query, tableName).Scan(&lastSyncTime)
  167. if err != nil {
  168. return time.Time{}, err
  169. }
  170. return lastSyncTime, nil
  171. }
  172. // insertSyncMetadata 插入新的同步元数据
  173. func insertSyncMetadata(db *sql.DB, tableName string, lastSyncTime time.Time) error {
  174. query := `
  175. INSERT INTO sync_metadata (table_name, last_sync_time)
  176. VALUES (?, ?)
  177. `
  178. _, err := db.Exec(query, tableName, lastSyncTime)
  179. return err
  180. }
  181. func checkSourceOaLicTime(sourceDB *sql.DB, lastSyncTime time.Time) (bool, error) {
  182. var checkCount int
  183. checkTime := `SELECT count(1) FROM XUGU.WORKFLOW_REQUESTBASE WR
  184. where
  185. TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS');`
  186. error := sourceDB.QueryRow(checkTime, lastSyncTime).Scan(&checkCount)
  187. if error != nil {
  188. Logger.Println("Failed to check count:", error)
  189. return false, error
  190. }
  191. if checkCount == 0 {
  192. Logger.Println("没有新数据需要同步")
  193. return false, nil
  194. }
  195. return true, nil
  196. }
  197. // captureData 执行数据抓取和同步
  198. func captureData(sourceDB, targetDB *sql.DB, lastSyncTime time.Time) (time.Time, int, error) {
  199. query := `
  200. SELECT
  201. fmd.id ,
  202. fm.REQUESTID,
  203. wr.REQUESTNAME,
  204. wr.REQUESTNAMENEW,
  205. wr.REQUESTNAMEHTMLNEW,
  206. fm.glxm,
  207. PP.NAME,
  208. fm.SQSJ,
  209. hrm1.LASTNAME,
  210. hrm2.LASTNAME,
  211. fm.XSJSYX,
  212. fm.JFJSYX,
  213. fm.SYDW,
  214. fm.XMXXMS,
  215. fm.JDS,
  216. fmd.JDS,
  217. ws1.SELECTVALUE,
  218. ws1.SELECTNAME,
  219. ws2.SELECTNAME,
  220. fmd.CLQ,
  221. fmd.CZXT,
  222. fmd.IP,
  223. fmd.MAC,
  224. wr.CREATEDATE,
  225. wr.CREATETIME,
  226. wr.LASTOPERATEDATE,
  227. wr.LASTOPERATETIME
  228. FROM XUGU.formtable_main_146 fm
  229. LEFT JOIN XUGU.HRMRESOURCE hrm1 ON TO_NUMBER(fm.XSRY) = hrm1.id
  230. LEFT JOIN XUGU.HRMRESOURCE hrm2 ON TO_NUMBER(fm.jfry) = hrm2.id
  231. LEFT JOIN XUGU.FORMTABLE_MAIN_146_dt1 fmd ON fmd.mainid = fm.id
  232. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws1 ON fmd.cpmc = ws1.SELECTVALUE AND ws1.FIELDID = 14627
  233. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws2 ON fmd.BB = ws2.SELECTVALUE AND ws2.FIELDID = 14628
  234. LEFT JOIN XUGU.WORKFLOW_REQUESTBASE WR ON fm.REQUESTID = WR.REQUESTID
  235. LEFT JOIN XUGU.PRJ_PROJECTINFO PP ON fm.glxm = PP.ID
  236. WHERE TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS')
  237. ORDER BY wr.CREATEDATE DESC, wr.CREATETIME DESC
  238. PARALLEL 8`
  239. var count int
  240. rows, err := sourceDB.Query(query, lastSyncTime.Format("2006-01-02 15:04:05"))
  241. if err != nil {
  242. Logger.Println("Failed to execute query on source DB:", err)
  243. return lastSyncTime, 0, err
  244. }
  245. defer rows.Close()
  246. var newSyncTime time.Time
  247. firstRecord := true
  248. for rows.Next() {
  249. var record OALicRecord
  250. count++
  251. if err := rows.Scan(
  252. &record.OA_ID,
  253. &record.OA_REQUESTID,
  254. &record.OA_REQUESTNAME,
  255. &record.OA_REQUESTNAMENEW,
  256. &record.OA_REQUESTNAMEHTMLNEW,
  257. &record.OA_GLXMID,
  258. &record.OA_GLXMNAME,
  259. &record.OA_SQSJ,
  260. &record.OA_SALESPERSONNAME,
  261. &record.OA_OPERATIONSPERSONNAME,
  262. &record.OA_XSJSYX,
  263. &record.OA_JFJSYX,
  264. &record.OA_SYDW,
  265. &record.OA_XMXXMS,
  266. &record.OA_JDS,
  267. &record.OA_NODECOUNT,
  268. &record.OA_PRODUCTCODE,
  269. &record.OA_PRODUCTNAME,
  270. &record.OA_PRODUCTVERSION,
  271. &record.OA_CPU,
  272. &record.OA_OPERATINGSYSTEM,
  273. &record.OA_MAINMAC,
  274. &record.OA_SECONDMAC,
  275. &record.OA_CREATIONDATE,
  276. &record.OA_CREATIONTIME,
  277. &record.OA_LASTOPERATEDATE,
  278. &record.OA_LASTOPERATETIME,
  279. ); err != nil {
  280. log.Println("Failed to scan row from source DB:", err)
  281. return lastSyncTime, 0, err
  282. }
  283. record.CaptureTime = sql.NullTime{
  284. Time: time.Now(),
  285. Valid: true,
  286. }
  287. // 插入新数据到目标数据库
  288. err := insertOALicRecordIntoTargetDB(targetDB, record)
  289. if err != nil {
  290. Logger.Println("Failed to insert record into target DB:", err)
  291. return lastSyncTime, 0, err
  292. }
  293. //fmt.Printf("Inserted record with OA_REQUESTID %d into target DB.\n", record.OA_REQUESTID.Int32)
  294. // 在第一次迭代时更新 newSyncTime 为当前记录的创建时间
  295. if firstRecord && record.OA_CREATIONDATE.Valid && record.OA_CREATIONTIME.Valid {
  296. crDate, _ := time.Parse("2006-01-02", record.OA_CREATIONDATE.String)
  297. crTime, _ := time.Parse("15:04:05", record.OA_CREATIONTIME.String)
  298. newSyncTime = time.Date(crDate.Year(), crDate.Month(), crDate.Day(), crTime.Hour(), crTime.Minute(), crTime.Second(), 0, time.Local)
  299. firstRecord = false
  300. }
  301. }
  302. // 如果有新数据同步,则更新 lastSyncTime
  303. if !newSyncTime.IsZero() {
  304. lastSyncTime = newSyncTime
  305. }
  306. return lastSyncTime, count, err
  307. }
  308. // insertOALicRecordIntoTargetDB 插入新记录到目标数据库
  309. func insertOALicRecordIntoTargetDB(targetDB *sql.DB, record OALicRecord) error {
  310. //fmt.Println("插入新记录到目标数据库")
  311. tx, err := targetDB.Begin()
  312. if err != nil {
  313. return fmt.Errorf("begin transaction: %v", err)
  314. }
  315. defer func() {
  316. if err != nil {
  317. tx.Rollback()
  318. } else {
  319. err = tx.Commit()
  320. }
  321. }()
  322. uniqueID := uuid.New()
  323. sql1 := `
  324. INSERT INTO target_OA_license (
  325. Unique_ID,
  326. OA_ID,
  327. OA_REQUESTID,
  328. OA_REQUESTNAME,
  329. OA_REQUESTNAMENEW,
  330. OA_REQUESTNAMEHTMLNEW,
  331. OA_GLXMID,
  332. OA_GLXMNAME,
  333. OA_SQSJ,
  334. OA_SALESPERSONNAME,
  335. OA_XSJSYX,
  336. OA_OPERATIONSPERSONNAME,
  337. OA_JFJSYX,
  338. OA_SYDW,
  339. OA_XMXXMS,
  340. OA_JDS,
  341. OA_NODECOUNT,
  342. OA_PRODUCTCODE,
  343. OA_PRODUCTNAME,
  344. OA_PRODUCTVERSION,
  345. OA_CPU,
  346. OA_OPERATINGSYSTEM,
  347. OA_MAINMAC,
  348. OA_SECONDMAC,
  349. OA_CREATIONDATE,
  350. OA_CREATIONTIME,
  351. OA_LASTOPERATEDATE,
  352. OA_LASTOPERATETIME,
  353. capture_Time
  354. ) VALUES (?,?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? ,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?)`
  355. _, err = tx.Exec(sql1,
  356. uniqueID,
  357. record.OA_ID,
  358. record.OA_REQUESTID,
  359. record.OA_REQUESTNAME,
  360. record.OA_REQUESTNAMENEW,
  361. record.OA_REQUESTNAMEHTMLNEW,
  362. record.OA_GLXMID,
  363. record.OA_GLXMNAME,
  364. record.OA_SQSJ,
  365. record.OA_SALESPERSONNAME,
  366. record.OA_XSJSYX,
  367. record.OA_OPERATIONSPERSONNAME,
  368. record.OA_JFJSYX,
  369. record.OA_SYDW,
  370. record.OA_XMXXMS,
  371. record.OA_JDS,
  372. record.OA_NODECOUNT,
  373. record.OA_PRODUCTCODE,
  374. record.OA_PRODUCTNAME,
  375. record.OA_PRODUCTVERSION,
  376. record.OA_CPU,
  377. record.OA_OPERATINGSYSTEM,
  378. record.OA_MAINMAC,
  379. record.OA_SECONDMAC,
  380. record.OA_CREATIONDATE,
  381. record.OA_CREATIONTIME,
  382. record.OA_LASTOPERATEDATE,
  383. record.OA_LASTOPERATETIME,
  384. record.CaptureTime,
  385. )
  386. if err != nil {
  387. log.Println("Failed to sql1 insert record into target DB:", err)
  388. }
  389. sql2 := `
  390. INSERT INTO License_generate_Info (
  391. License_UniqueID,
  392. OA_ID,
  393. License_Flage)
  394. VALUES (?, ?,?)
  395. `
  396. _, err = tx.Exec(sql2,
  397. uniqueID,
  398. record.OA_ID,
  399. "未生成",
  400. )
  401. if err != nil {
  402. Logger.Println("Failed to sql2 insert record into target DB:", err)
  403. return err
  404. }
  405. return nil
  406. }