capture.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. package capture
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/google/uuid"
  8. )
  9. type OALicRecord struct {
  10. OA_ID sql.NullInt32 `json:"OA_ID"`
  11. OA_REQUESTID sql.NullInt32 `json:"oa_request_id"`
  12. OA_REQUESTNAME sql.NullString `json:"oa_request_name"`
  13. OA_REQUESTNAMENEW sql.NullString `json:"oa_request_name_new"`
  14. OA_REQUESTNAMEHTMLNEW sql.NullString `json:"oa_request_name_html_new"`
  15. OA_GLXMID sql.NullString `json:"oa_glxmid"`
  16. OA_GLXMNAME sql.NullString `json:"oa_glxmname"`
  17. OA_SQSJ sql.NullString `json:"oa_sqsj"`
  18. OA_SALESPERSONNAME sql.NullString `json:"oa_sales_person_name"`
  19. OA_XSJSYX sql.NullString `json:"oa_sales_email"`
  20. OA_OPERATIONSPERSONNAME sql.NullString `json:"oa_operations_person_name"`
  21. OA_JFJSYX sql.NullString `json:"oa_operations_email"`
  22. OA_SYDW sql.NullString `json:"oa_using_unit"`
  23. OA_XMXXMS sql.NullString `json:"oa_project_detail"`
  24. OA_JDS sql.NullInt32 `json:"oa_total_nodes"`
  25. OA_NODECOUNT sql.NullInt32 `json:"oa_node_count"`
  26. OA_PRODUCTCODE sql.NullString `json:"oa_product_code"`
  27. OA_PRODUCTNAME sql.NullString `json:"oa_product_name"`
  28. OA_PRODUCTVERSION sql.NullString `json:"oa_product_version"`
  29. OA_CPU sql.NullString `json:"oa_cpu"`
  30. OA_OPERATINGSYSTEM sql.NullString `json:"oa_operating_system"`
  31. OA_MAINMAC sql.NullString `json:"oa_main_mac"`
  32. OA_SECONDMAC sql.NullString `json:"oa_secondary_mac"`
  33. OA_CREATIONDATE sql.NullString `json:"oa_creation_date"`
  34. OA_CREATIONTIME sql.NullString `json:"oa_creation_time"`
  35. OA_LASTOPERATEDATE sql.NullString `json:"oa_last_operation_date"`
  36. OA_LASTOPERATETIME sql.NullString `json:"oa_last_operation_time"`
  37. CaptureTime sql.NullTime `json:"capture_time"`
  38. }
  39. func StartCapture(sourceDB, targetDB *sql.DB) {
  40. InitLogs("./logs/capture", "info")
  41. tableName := "target_OA_license"
  42. // 读取上次同步时间
  43. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  44. if err != nil {
  45. Logger.Fatal("获取上次同步时间错误 Failed to retrieve last sync time from target DB:", err)
  46. }
  47. // 启动数据同步,每隔15分钟运行一次
  48. for {
  49. Logger.Println("Start capturing data...")
  50. checkTemp, err := checkSourceOaLicTime(sourceDB, lastSyncTime)
  51. if err != nil {
  52. Logger.Error("Failed to checkSourceOaLicTime:", err)
  53. continue
  54. }
  55. if !checkTemp {
  56. time.Sleep(15 * time.Second)
  57. continue
  58. }
  59. Logger.Println("有新数据")
  60. lastSyncTime1, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  61. lastSyncTime = lastSyncTime1
  62. if err != nil {
  63. Logger.Error("抓取源数据错误 Failed to capture data:", err)
  64. }
  65. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  66. Logger.Error("插入同步时间表错误 Failed to insert sync metadata into target DB:", err)
  67. }
  68. Logger.Println("插入时间: ", lastSyncTime)
  69. if count > 0 {
  70. //lic关联用户表
  71. err := insertLicToUser(count, targetDB)
  72. if err != nil {
  73. Logger.Error("关联用户表错误 Failed to insertLicToUser:", err)
  74. }
  75. }
  76. time.Sleep(15 * time.Second)
  77. }
  78. }
  79. func insertLicToUser(count int, targetDB *sql.DB) error {
  80. //查询抓取表中最新的count条数据
  81. query := "SELECT UNIQUE_ID,OA_SALESPERSONNAME, OA_OPERATIONSPERSONNAME FROM target_OA_license ORDER BY capture_time DESC LIMIT ?"
  82. rows, err := targetDB.Query(query, count)
  83. if err != nil {
  84. Logger.Println("查询源库最新数据错误 Failed to execute query on source DB:", err)
  85. return err
  86. }
  87. defer rows.Close()
  88. Logger.Println("查询到最新的count条数据 ", count)
  89. for rows.Next() {
  90. var licUniqueID, salesPersonName, operationsPersonName sql.NullString
  91. err := rows.Scan(&licUniqueID, &salesPersonName, &operationsPersonName)
  92. if err != nil {
  93. Logger.Error("插入到目标库错误 insertLicToUser find newdata Failed to scan row:", err)
  94. continue
  95. }
  96. //将获取到的字段,和用户表关联对比
  97. var userUNIQUEID, userACCOUNT string
  98. queryLIC_USER := `SELECT UNIQUEID, ACCOUNT FROM LIC_USER where
  99. (USERNAME = ? OR USERNAME = ? )
  100. and (role = 'ADMIN' OR role = 'supportRole' OR role = 'salesRole');`
  101. err = targetDB.QueryRow(queryLIC_USER, salesPersonName, operationsPersonName).Scan(&userUNIQUEID, &userACCOUNT)
  102. if err == sql.ErrNoRows {
  103. Logger.Warn("No rows were returned")
  104. continue
  105. }
  106. if err != nil {
  107. Logger.Error("find user name Failed to scan row :", err)
  108. continue
  109. }
  110. //fmt.Println("关联到用户 ", userUNIQUEID, userACCOUNT)
  111. insertQuery := "INSERT INTO LICENSERECORDTOUSER(LICENSE_UNIQUEID,USER_UNIQUEID,USER_ACCOUNT,OPERATOR_UNIQUEID,UP_TIME) VALUES(?,?,?,'SYSTEM_AUTO',?)"
  112. _, err = targetDB.Exec(insertQuery,
  113. licUniqueID,
  114. userUNIQUEID,
  115. userACCOUNT,
  116. time.Now(),
  117. )
  118. if err != nil {
  119. Logger.Error("lic关联到目标库用户错误 Failed to sql2 insert record into target DB:", err)
  120. }
  121. }
  122. return nil
  123. }
  124. // CaptureOnce 主动触发一次数据抓取
  125. func CaptureOnce(sourceDB, targetDB *sql.DB) error {
  126. tableName := "target_OA_license"
  127. // 读取上次同步时间
  128. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  129. if err != nil {
  130. Logger.Error("Failed to retrieve last sync time from target DB:", err)
  131. return err
  132. }
  133. // 执行一次数据抓取
  134. lastSyncTime, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  135. if err != nil {
  136. Logger.Error("Failed to capture data:", err)
  137. return err
  138. }
  139. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  140. Logger.Error("Failed to insert sync metadata into target DB:", err)
  141. return err
  142. }
  143. Logger.Println("Captured", count, "records.")
  144. if count > 0 {
  145. //lic关联用户表
  146. err := insertLicToUser(count, targetDB)
  147. if err != nil {
  148. Logger.Error("insertLicToUser Failed to insert record into target DB:", err)
  149. return err
  150. }
  151. }
  152. return nil
  153. }
  154. // getLastSyncTime 从数据库中获取指定表的上次同步时间
  155. func getLastSyncTime(db *sql.DB, tableName string) (time.Time, error) {
  156. var lastSyncTime time.Time
  157. query := `
  158. SELECT last_sync_time
  159. FROM sync_metadata
  160. WHERE table_name = ?
  161. ORDER BY last_sync_time DESC
  162. LIMIT 1
  163. `
  164. err := db.QueryRow(query, tableName).Scan(&lastSyncTime)
  165. if err != nil {
  166. return time.Time{}, err
  167. }
  168. return lastSyncTime, nil
  169. }
  170. // insertSyncMetadata 插入新的同步元数据
  171. func insertSyncMetadata(db *sql.DB, tableName string, lastSyncTime time.Time) error {
  172. query := `
  173. INSERT INTO sync_metadata (table_name, last_sync_time)
  174. VALUES (?, ?)
  175. `
  176. _, err := db.Exec(query, tableName, lastSyncTime)
  177. return err
  178. }
  179. func checkSourceOaLicTime(sourceDB *sql.DB, lastSyncTime time.Time) (bool, error) {
  180. var checkCount int
  181. checkTime := `SELECT count(1) FROM XUGU.WORKFLOW_REQUESTBASE WR
  182. where
  183. TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS');`
  184. error := sourceDB.QueryRow(checkTime, lastSyncTime).Scan(&checkCount)
  185. if error != nil {
  186. Logger.Println("Failed to check count:", error)
  187. return false, error
  188. }
  189. if checkCount == 0 {
  190. Logger.Println("没有新数据需要同步")
  191. return false, nil
  192. }
  193. return true, nil
  194. }
  195. // captureData 执行数据抓取和同步
  196. func captureData(sourceDB, targetDB *sql.DB, lastSyncTime time.Time) (time.Time, int, error) {
  197. query := `
  198. SELECT
  199. fmd.id ,
  200. fm.REQUESTID,
  201. wr.REQUESTNAME,
  202. wr.REQUESTNAMENEW,
  203. wr.REQUESTNAMEHTMLNEW,
  204. fm.glxm,
  205. PP.NAME,
  206. fm.SQSJ,
  207. hrm1.LASTNAME,
  208. hrm2.LASTNAME,
  209. fm.XSJSYX,
  210. fm.JFJSYX,
  211. fm.SYDW,
  212. fm.XMXXMS,
  213. fm.JDS,
  214. fmd.JDS,
  215. ws1.SELECTVALUE,
  216. ws1.SELECTNAME,
  217. ws2.SELECTNAME,
  218. fmd.CLQ,
  219. fmd.CZXT,
  220. fmd.IP,
  221. fmd.MAC,
  222. wr.CREATEDATE,
  223. wr.CREATETIME,
  224. wr.LASTOPERATEDATE,
  225. wr.LASTOPERATETIME
  226. FROM XUGU.formtable_main_146 fm
  227. LEFT JOIN XUGU.HRMRESOURCE hrm1 ON TO_NUMBER(fm.XSRY) = hrm1.id
  228. LEFT JOIN XUGU.HRMRESOURCE hrm2 ON TO_NUMBER(fm.jfry) = hrm2.id
  229. LEFT JOIN XUGU.FORMTABLE_MAIN_146_dt1 fmd ON fmd.mainid = fm.id
  230. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws1 ON fmd.cpmc = ws1.SELECTVALUE AND ws1.FIELDID = 14627
  231. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws2 ON fmd.BB = ws2.SELECTVALUE AND ws2.FIELDID = 14628
  232. LEFT JOIN XUGU.WORKFLOW_REQUESTBASE WR ON fm.REQUESTID = WR.REQUESTID
  233. LEFT JOIN XUGU.PRJ_PROJECTINFO PP ON fm.glxm = PP.ID
  234. WHERE TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS')
  235. ORDER BY wr.CREATEDATE DESC, wr.CREATETIME DESC
  236. PARALLEL 8`
  237. var count int
  238. rows, err := sourceDB.Query(query, lastSyncTime.Format("2006-01-02 15:04:05"))
  239. if err != nil {
  240. Logger.Println("Failed to execute query on source DB:", err)
  241. return lastSyncTime, 0, err
  242. }
  243. defer rows.Close()
  244. var newSyncTime time.Time
  245. firstRecord := true
  246. for rows.Next() {
  247. var record OALicRecord
  248. count++
  249. if err := rows.Scan(
  250. &record.OA_ID,
  251. &record.OA_REQUESTID,
  252. &record.OA_REQUESTNAME,
  253. &record.OA_REQUESTNAMENEW,
  254. &record.OA_REQUESTNAMEHTMLNEW,
  255. &record.OA_GLXMID,
  256. &record.OA_GLXMNAME,
  257. &record.OA_SQSJ,
  258. &record.OA_SALESPERSONNAME,
  259. &record.OA_OPERATIONSPERSONNAME,
  260. &record.OA_XSJSYX,
  261. &record.OA_JFJSYX,
  262. &record.OA_SYDW,
  263. &record.OA_XMXXMS,
  264. &record.OA_JDS,
  265. &record.OA_NODECOUNT,
  266. &record.OA_PRODUCTCODE,
  267. &record.OA_PRODUCTNAME,
  268. &record.OA_PRODUCTVERSION,
  269. &record.OA_CPU,
  270. &record.OA_OPERATINGSYSTEM,
  271. &record.OA_MAINMAC,
  272. &record.OA_SECONDMAC,
  273. &record.OA_CREATIONDATE,
  274. &record.OA_CREATIONTIME,
  275. &record.OA_LASTOPERATEDATE,
  276. &record.OA_LASTOPERATETIME,
  277. ); err != nil {
  278. log.Println("Failed to scan row from source DB:", err)
  279. return lastSyncTime, 0, err
  280. }
  281. record.CaptureTime = sql.NullTime{
  282. Time: time.Now(),
  283. Valid: true,
  284. }
  285. // 插入新数据到目标数据库
  286. err := insertOALicRecordIntoTargetDB(targetDB, record)
  287. if err != nil {
  288. Logger.Println("Failed to insert record into target DB:", err)
  289. return lastSyncTime, 0, err
  290. }
  291. //fmt.Printf("Inserted record with OA_REQUESTID %d into target DB.\n", record.OA_REQUESTID.Int32)
  292. // 在第一次迭代时更新 newSyncTime 为当前记录的创建时间
  293. if firstRecord && record.OA_CREATIONDATE.Valid && record.OA_CREATIONTIME.Valid {
  294. crDate, _ := time.Parse("2006-01-02", record.OA_CREATIONDATE.String)
  295. crTime, _ := time.Parse("15:04:05", record.OA_CREATIONTIME.String)
  296. newSyncTime = time.Date(crDate.Year(), crDate.Month(), crDate.Day(), crTime.Hour(), crTime.Minute(), crTime.Second(), 0, time.Local)
  297. firstRecord = false
  298. }
  299. }
  300. // 如果有新数据同步,则更新 lastSyncTime
  301. if !newSyncTime.IsZero() {
  302. lastSyncTime = newSyncTime
  303. }
  304. return lastSyncTime, count, err
  305. }
  306. // insertOALicRecordIntoTargetDB 插入新记录到目标数据库
  307. func insertOALicRecordIntoTargetDB(targetDB *sql.DB, record OALicRecord) error {
  308. //fmt.Println("插入新记录到目标数据库")
  309. tx, err := targetDB.Begin()
  310. if err != nil {
  311. return fmt.Errorf("begin transaction: %v", err)
  312. }
  313. defer func() {
  314. if err != nil {
  315. tx.Rollback()
  316. } else {
  317. err = tx.Commit()
  318. }
  319. }()
  320. uniqueID := uuid.New()
  321. sql1 := `
  322. INSERT INTO target_OA_license (
  323. Unique_ID,
  324. OA_ID,
  325. OA_REQUESTID,
  326. OA_REQUESTNAME,
  327. OA_REQUESTNAMENEW,
  328. OA_REQUESTNAMEHTMLNEW,
  329. OA_GLXMID,
  330. OA_GLXMNAME,
  331. OA_SQSJ,
  332. OA_SALESPERSONNAME,
  333. OA_XSJSYX,
  334. OA_OPERATIONSPERSONNAME,
  335. OA_JFJSYX,
  336. OA_SYDW,
  337. OA_XMXXMS,
  338. OA_JDS,
  339. OA_NODECOUNT,
  340. OA_PRODUCTCODE,
  341. OA_PRODUCTNAME,
  342. OA_PRODUCTVERSION,
  343. OA_CPU,
  344. OA_OPERATINGSYSTEM,
  345. OA_MAINMAC,
  346. OA_SECONDMAC,
  347. OA_CREATIONDATE,
  348. OA_CREATIONTIME,
  349. OA_LASTOPERATEDATE,
  350. OA_LASTOPERATETIME,
  351. capture_Time
  352. ) VALUES (?,?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? ,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?)`
  353. _, err = tx.Exec(sql1,
  354. uniqueID,
  355. record.OA_ID,
  356. record.OA_REQUESTID,
  357. record.OA_REQUESTNAME,
  358. record.OA_REQUESTNAMENEW,
  359. record.OA_REQUESTNAMEHTMLNEW,
  360. record.OA_GLXMID,
  361. record.OA_GLXMNAME,
  362. record.OA_SQSJ,
  363. record.OA_SALESPERSONNAME,
  364. record.OA_XSJSYX,
  365. record.OA_OPERATIONSPERSONNAME,
  366. record.OA_JFJSYX,
  367. record.OA_SYDW,
  368. record.OA_XMXXMS,
  369. record.OA_JDS,
  370. record.OA_NODECOUNT,
  371. record.OA_PRODUCTCODE,
  372. record.OA_PRODUCTNAME,
  373. record.OA_PRODUCTVERSION,
  374. record.OA_CPU,
  375. record.OA_OPERATINGSYSTEM,
  376. record.OA_MAINMAC,
  377. record.OA_SECONDMAC,
  378. record.OA_CREATIONDATE,
  379. record.OA_CREATIONTIME,
  380. record.OA_LASTOPERATEDATE,
  381. record.OA_LASTOPERATETIME,
  382. record.CaptureTime,
  383. )
  384. if err != nil {
  385. log.Println("Failed to sql1 insert record into target DB:", err)
  386. }
  387. sql2 := `
  388. INSERT INTO License_generate_Info (
  389. License_UniqueID,
  390. OA_ID,
  391. License_Flage)
  392. VALUES (?, ?,?)
  393. `
  394. _, err = tx.Exec(sql2,
  395. uniqueID,
  396. record.OA_ID,
  397. "未生成",
  398. )
  399. if err != nil {
  400. Logger.Println("Failed to sql2 insert record into target DB:", err)
  401. return err
  402. }
  403. return nil
  404. }