capture.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. package capture
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/google/uuid"
  8. )
  9. type OALicRecord struct {
  10. OA_ID sql.NullInt32 `json:"OA_ID"`
  11. OA_REQUESTID sql.NullInt32 `json:"oa_request_id"`
  12. OA_REQUESTNAME sql.NullString `json:"oa_request_name"`
  13. OA_REQUESTNAMENEW sql.NullString `json:"oa_request_name_new"`
  14. OA_REQUESTNAMEHTMLNEW sql.NullString `json:"oa_request_name_html_new"`
  15. OA_GLXMID sql.NullString `json:"oa_glxmid"`
  16. OA_GLXMNAME sql.NullString `json:"oa_glxmname"`
  17. OA_SQSJ sql.NullString `json:"oa_sqsj"`
  18. OA_SALESPERSONNAME sql.NullString `json:"oa_sales_person_name"`
  19. OA_XSJSYX sql.NullString `json:"oa_sales_email"`
  20. OA_OPERATIONSPERSONNAME sql.NullString `json:"oa_operations_person_name"`
  21. OA_JFJSYX sql.NullString `json:"oa_operations_email"`
  22. OA_SYDW sql.NullString `json:"oa_using_unit"`
  23. OA_XMXXMS sql.NullString `json:"oa_project_detail"`
  24. OA_JDS sql.NullInt32 `json:"oa_total_nodes"`
  25. OA_NODECOUNT sql.NullInt32 `json:"oa_node_count"`
  26. OA_PRODUCTCODE sql.NullString `json:"oa_product_code"`
  27. OA_PRODUCTNAME sql.NullString `json:"oa_product_name"`
  28. OA_PRODUCTVERSION sql.NullString `json:"oa_product_version"`
  29. OA_CPU sql.NullString `json:"oa_cpu"`
  30. OA_OPERATINGSYSTEM sql.NullString `json:"oa_operating_system"`
  31. OA_MAINMAC sql.NullString `json:"oa_main_mac"`
  32. OA_SECONDMAC sql.NullString `json:"oa_secondary_mac"`
  33. OA_CREATIONDATE sql.NullString `json:"oa_creation_date"`
  34. OA_CREATIONTIME sql.NullString `json:"oa_creation_time"`
  35. OA_LASTOPERATEDATE sql.NullString `json:"oa_last_operation_date"`
  36. OA_LASTOPERATETIME sql.NullString `json:"oa_last_operation_time"`
  37. CaptureTime sql.NullTime `json:"capture_time"`
  38. }
  39. func StartCapture(sourceDB, targetDB *sql.DB) {
  40. tableName := "target_OA_license"
  41. // 读取上次同步时间
  42. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  43. if err != nil {
  44. log.Fatal("Failed to retrieve last sync time from target DB:", err)
  45. }
  46. // 启动数据同步,每隔15分钟运行一次
  47. for {
  48. fmt.Println("Start capturing data...")
  49. checkTemp, err := checkSourceOaLicTime(sourceDB, lastSyncTime)
  50. if err != nil {
  51. log.Println("Failed to checkSourceOaLicTime:", err)
  52. }
  53. if !checkTemp {
  54. time.Sleep(15 * time.Second)
  55. continue
  56. }
  57. fmt.Println("有新数据")
  58. lastSyncTime1, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  59. lastSyncTime = lastSyncTime1
  60. if err != nil {
  61. log.Println("Failed to capture data:", err)
  62. }
  63. fmt.Println("抓完一次数据")
  64. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  65. log.Println("Failed to insert sync metadata into target DB:", err)
  66. }
  67. fmt.Println("插入时间: ", lastSyncTime)
  68. if count > 0 {
  69. //lic关联用户表
  70. insertLicToUser(count, targetDB)
  71. }
  72. time.Sleep(15 * time.Second)
  73. }
  74. }
  75. func insertLicToUser(count int, targetDB *sql.DB) error {
  76. //查询抓取表中最新的count条数据
  77. query := "SELECT UNIQUE_ID,OA_SALESPERSONNAME, OA_OPERATIONSPERSONNAME FROM target_OA_license ORDER BY capture_time DESC LIMIT ?"
  78. rows, err := targetDB.Query(query, count)
  79. if err != nil {
  80. log.Println("Failed to execute query on source DB:", err)
  81. return err
  82. }
  83. defer rows.Close()
  84. fmt.Println("查询到最新的count条数据 ", count)
  85. for rows.Next() {
  86. var licUniqueID, salesPersonName, operationsPersonName sql.NullString
  87. err := rows.Scan(&licUniqueID, &salesPersonName, &operationsPersonName)
  88. if err != nil {
  89. log.Println("insertLicToUser find newdata Failed to scan row:", err)
  90. continue
  91. }
  92. //将获取到的字段,和用户表关联对比
  93. var userUNIQUEID, userACCOUNT string
  94. queryLIC_USER := `SELECT UNIQUEID, ACCOUNT FROM LIC_USER where
  95. (USERNAME = ? OR USERNAME = ? )
  96. and (role = 'ADMIN' OR role = 'supportRole' OR role = 'salesRole');`
  97. err = targetDB.QueryRow(queryLIC_USER, salesPersonName, operationsPersonName).Scan(&userUNIQUEID, &userACCOUNT)
  98. if err != nil {
  99. log.Println("find user name Failed to scan row :", err)
  100. continue
  101. }
  102. fmt.Println("salesPersonName and operationsPersonName ", salesPersonName, operationsPersonName)
  103. fmt.Println("userUNIQUEID and userACCOUNT ", userUNIQUEID, userACCOUNT)
  104. //fmt.Println("关联到用户 ", userUNIQUEID, userACCOUNT)
  105. insertQuery := "INSERT INTO LICENSERECORDTOUSER(LICENSE_UNIQUEID,USER_UNIQUEID,USER_ACCOUNT,OPERATOR_UNIQUEID,UP_TIME) VALUES(?,?,?,'SYSTEM_AUTO',?)"
  106. _, err = targetDB.Exec(insertQuery,
  107. licUniqueID,
  108. userUNIQUEID,
  109. userACCOUNT,
  110. time.Now(),
  111. )
  112. if err != nil {
  113. log.Println("Failed to sql2 insert record into target DB:", err)
  114. }
  115. }
  116. return nil
  117. }
  118. // CaptureOnce 主动触发一次数据抓取
  119. func CaptureOnce(sourceDB, targetDB *sql.DB) {
  120. tableName := "target_OA_license"
  121. // 读取上次同步时间
  122. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  123. if err != nil {
  124. log.Println("Failed to retrieve last sync time from target DB:", err)
  125. return
  126. }
  127. // 执行一次数据抓取
  128. lastSyncTime, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  129. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  130. log.Println("Failed to insert sync metadata into target DB:", err)
  131. }
  132. fmt.Println("Captured", count, "records.")
  133. }
  134. // getLastSyncTime 从数据库中获取指定表的上次同步时间
  135. func getLastSyncTime(db *sql.DB, tableName string) (time.Time, error) {
  136. var lastSyncTime time.Time
  137. query := `
  138. SELECT last_sync_time
  139. FROM sync_metadata
  140. WHERE table_name = ?
  141. ORDER BY last_sync_time DESC
  142. LIMIT 1
  143. `
  144. err := db.QueryRow(query, tableName).Scan(&lastSyncTime)
  145. if err != nil {
  146. return time.Time{}, err
  147. }
  148. return lastSyncTime, nil
  149. }
  150. // insertSyncMetadata 插入新的同步元数据
  151. func insertSyncMetadata(db *sql.DB, tableName string, lastSyncTime time.Time) error {
  152. query := `
  153. INSERT INTO sync_metadata (table_name, last_sync_time)
  154. VALUES (?, ?)
  155. `
  156. _, err := db.Exec(query, tableName, lastSyncTime)
  157. return err
  158. }
  159. func checkSourceOaLicTime(sourceDB *sql.DB, lastSyncTime time.Time) (bool, error) {
  160. var checkCount int
  161. checkTime := `SELECT count(1) FROM XUGU.WORKFLOW_REQUESTBASE WR
  162. where
  163. TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS');`
  164. error := sourceDB.QueryRow(checkTime, lastSyncTime).Scan(&checkCount)
  165. if error != nil {
  166. log.Println("Failed to check count:", error)
  167. return false, error
  168. }
  169. if checkCount == 0 {
  170. fmt.Println("没有新数据需要同步")
  171. return false, nil
  172. }
  173. return true, nil
  174. }
  175. // captureData 执行数据抓取和同步
  176. func captureData(sourceDB, targetDB *sql.DB, lastSyncTime time.Time) (time.Time, int, error) {
  177. query := `
  178. SELECT
  179. fmd.id ,
  180. fm.REQUESTID,
  181. wr.REQUESTNAME,
  182. wr.REQUESTNAMENEW,
  183. wr.REQUESTNAMEHTMLNEW,
  184. fm.glxm,
  185. PP.NAME,
  186. fm.SQSJ,
  187. hrm1.LASTNAME,
  188. hrm2.LASTNAME,
  189. fm.XSJSYX,
  190. fm.JFJSYX,
  191. fm.SYDW,
  192. fm.XMXXMS,
  193. fm.JDS,
  194. fmd.JDS,
  195. ws1.SELECTVALUE,
  196. ws1.SELECTNAME,
  197. ws2.SELECTNAME,
  198. fmd.CLQ,
  199. fmd.CZXT,
  200. fmd.IP,
  201. fmd.MAC,
  202. wr.CREATEDATE,
  203. wr.CREATETIME,
  204. wr.LASTOPERATEDATE,
  205. wr.LASTOPERATETIME
  206. FROM XUGU.formtable_main_146 fm
  207. LEFT JOIN XUGU.HRMRESOURCE hrm1 ON TO_NUMBER(fm.XSRY) = hrm1.id
  208. LEFT JOIN XUGU.HRMRESOURCE hrm2 ON TO_NUMBER(fm.jfry) = hrm2.id
  209. LEFT JOIN XUGU.FORMTABLE_MAIN_146_dt1 fmd ON fmd.mainid = fm.id
  210. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws1 ON fmd.cpmc = ws1.SELECTVALUE AND ws1.FIELDID = 14627
  211. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws2 ON fmd.BB = ws2.SELECTVALUE AND ws2.FIELDID = 14628
  212. LEFT JOIN XUGU.WORKFLOW_REQUESTBASE WR ON fm.REQUESTID = WR.REQUESTID
  213. LEFT JOIN XUGU.PRJ_PROJECTINFO PP ON fm.glxm = PP.ID
  214. WHERE TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS')
  215. ORDER BY wr.CREATEDATE DESC, wr.CREATETIME DESC
  216. PARALLEL 8`
  217. var count int
  218. rows, err := sourceDB.Query(query, lastSyncTime.Format("2006-01-02 15:04:05"))
  219. if err != nil {
  220. log.Println("Failed to execute query on source DB:", err)
  221. return lastSyncTime, 0, err
  222. }
  223. defer rows.Close()
  224. var newSyncTime time.Time
  225. firstRecord := true
  226. for rows.Next() {
  227. var record OALicRecord
  228. count++
  229. if err := rows.Scan(
  230. &record.OA_ID,
  231. &record.OA_REQUESTID,
  232. &record.OA_REQUESTNAME,
  233. &record.OA_REQUESTNAMENEW,
  234. &record.OA_REQUESTNAMEHTMLNEW,
  235. &record.OA_GLXMID,
  236. &record.OA_GLXMNAME,
  237. &record.OA_SQSJ,
  238. &record.OA_SALESPERSONNAME,
  239. &record.OA_OPERATIONSPERSONNAME,
  240. &record.OA_XSJSYX,
  241. &record.OA_JFJSYX,
  242. &record.OA_SYDW,
  243. &record.OA_XMXXMS,
  244. &record.OA_JDS,
  245. &record.OA_NODECOUNT,
  246. &record.OA_PRODUCTCODE,
  247. &record.OA_PRODUCTNAME,
  248. &record.OA_PRODUCTVERSION,
  249. &record.OA_CPU,
  250. &record.OA_OPERATINGSYSTEM,
  251. &record.OA_MAINMAC,
  252. &record.OA_SECONDMAC,
  253. &record.OA_CREATIONDATE,
  254. &record.OA_CREATIONTIME,
  255. &record.OA_LASTOPERATEDATE,
  256. &record.OA_LASTOPERATETIME,
  257. ); err != nil {
  258. log.Println("Failed to scan row from source DB:", err)
  259. return lastSyncTime, 0, err
  260. }
  261. record.CaptureTime = sql.NullTime{
  262. Time: time.Now(),
  263. Valid: true,
  264. }
  265. // 插入新数据到目标数据库
  266. err := insertOALicRecordIntoTargetDB(targetDB, record)
  267. if err != nil {
  268. log.Println("Failed to insert record into target DB:", err)
  269. return lastSyncTime, 0, err
  270. }
  271. //fmt.Printf("Inserted record with OA_REQUESTID %d into target DB.\n", record.OA_REQUESTID.Int32)
  272. // 在第一次迭代时更新 newSyncTime 为当前记录的创建时间
  273. if firstRecord && record.OA_CREATIONDATE.Valid && record.OA_CREATIONTIME.Valid {
  274. crDate, _ := time.Parse("2006-01-02", record.OA_CREATIONDATE.String)
  275. crTime, _ := time.Parse("15:04:05", record.OA_CREATIONTIME.String)
  276. newSyncTime = time.Date(crDate.Year(), crDate.Month(), crDate.Day(), crTime.Hour(), crTime.Minute(), crTime.Second(), 0, time.Local)
  277. firstRecord = false
  278. }
  279. }
  280. // 如果有新数据同步,则更新 lastSyncTime
  281. if !newSyncTime.IsZero() {
  282. lastSyncTime = newSyncTime
  283. }
  284. return lastSyncTime, count, err
  285. }
  286. // insertOALicRecordIntoTargetDB 插入新记录到目标数据库
  287. func insertOALicRecordIntoTargetDB(targetDB *sql.DB, record OALicRecord) error {
  288. //fmt.Println("插入新记录到目标数据库")
  289. tx, err := targetDB.Begin()
  290. if err != nil {
  291. return fmt.Errorf("begin transaction: %v", err)
  292. }
  293. defer func() {
  294. if err != nil {
  295. tx.Rollback()
  296. } else {
  297. err = tx.Commit()
  298. }
  299. }()
  300. uniqueID := uuid.New()
  301. sql1 := `
  302. INSERT INTO target_OA_license (
  303. Unique_ID,
  304. OA_ID,
  305. OA_REQUESTID,
  306. OA_REQUESTNAME,
  307. OA_REQUESTNAMENEW,
  308. OA_REQUESTNAMEHTMLNEW,
  309. OA_GLXMID,
  310. OA_GLXMNAME,
  311. OA_SQSJ,
  312. OA_SALESPERSONNAME,
  313. OA_XSJSYX,
  314. OA_OPERATIONSPERSONNAME,
  315. OA_JFJSYX,
  316. OA_SYDW,
  317. OA_XMXXMS,
  318. OA_JDS,
  319. OA_NODECOUNT,
  320. OA_PRODUCTCODE,
  321. OA_PRODUCTNAME,
  322. OA_PRODUCTVERSION,
  323. OA_CPU,
  324. OA_OPERATINGSYSTEM,
  325. OA_MAINMAC,
  326. OA_SECONDMAC,
  327. OA_CREATIONDATE,
  328. OA_CREATIONTIME,
  329. OA_LASTOPERATEDATE,
  330. OA_LASTOPERATETIME,
  331. capture_Time
  332. ) VALUES (?,?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? ,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?)`
  333. _, err = tx.Exec(sql1,
  334. uniqueID,
  335. record.OA_ID,
  336. record.OA_REQUESTID,
  337. record.OA_REQUESTNAME,
  338. record.OA_REQUESTNAMENEW,
  339. record.OA_REQUESTNAMEHTMLNEW,
  340. record.OA_GLXMID,
  341. record.OA_GLXMNAME,
  342. record.OA_SQSJ,
  343. record.OA_SALESPERSONNAME,
  344. record.OA_XSJSYX,
  345. record.OA_OPERATIONSPERSONNAME,
  346. record.OA_JFJSYX,
  347. record.OA_SYDW,
  348. record.OA_XMXXMS,
  349. record.OA_JDS,
  350. record.OA_NODECOUNT,
  351. record.OA_PRODUCTCODE,
  352. record.OA_PRODUCTNAME,
  353. record.OA_PRODUCTVERSION,
  354. record.OA_CPU,
  355. record.OA_OPERATINGSYSTEM,
  356. record.OA_MAINMAC,
  357. record.OA_SECONDMAC,
  358. record.OA_CREATIONDATE,
  359. record.OA_CREATIONTIME,
  360. record.OA_LASTOPERATEDATE,
  361. record.OA_LASTOPERATETIME,
  362. record.CaptureTime,
  363. )
  364. if err != nil {
  365. log.Println("Failed to sql1 insert record into target DB:", err)
  366. }
  367. sql2 := `
  368. INSERT INTO License_generate_Info (
  369. License_UniqueID,
  370. OA_ID,
  371. License_Flage)
  372. VALUES (?, ?,?)
  373. `
  374. _, err = tx.Exec(sql2,
  375. uniqueID,
  376. record.OA_ID,
  377. "未生成",
  378. )
  379. if err != nil {
  380. log.Println("Failed to sql2 insert record into target DB:", err)
  381. return err
  382. }
  383. return nil
  384. }