capture.go 12 KB


  1. package capture
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/google/uuid"
  8. )
  9. type OALicRecord struct {
  10. OA_ID sql.NullInt32 `json:"OA_ID"`
  11. OA_REQUESTID sql.NullInt32 `json:"oa_request_id"`
  12. OA_REQUESTNAME sql.NullString `json:"oa_request_name"`
  13. OA_REQUESTNAMENEW sql.NullString `json:"oa_request_name_new"`
  14. OA_REQUESTNAMEHTMLNEW sql.NullString `json:"oa_request_name_html_new"`
  15. OA_GLXMID sql.NullString `json:"oa_glxmid"`
  16. OA_GLXMNAME sql.NullString `json:"oa_glxmname"`
  17. OA_SQSJ sql.NullString `json:"oa_sqsj"`
  18. OA_SALESPERSONNAME sql.NullString `json:"oa_sales_person_name"`
  19. OA_XSJSYX sql.NullString `json:"oa_sales_email"`
  20. OA_OPERATIONSPERSONNAME sql.NullString `json:"oa_operations_person_name"`
  21. OA_JFJSYX sql.NullString `json:"oa_operations_email"`
  22. OA_SYDW sql.NullString `json:"oa_using_unit"`
  23. OA_XMXXMS sql.NullString `json:"oa_project_detail"`
  24. OA_JDS sql.NullInt32 `json:"oa_total_nodes"`
  25. OA_NODECOUNT sql.NullInt32 `json:"oa_node_count"`
  26. OA_PRODUCTCODE sql.NullString `json:"oa_product_code"`
  27. OA_PRODUCTNAME sql.NullString `json:"oa_product_name"`
  28. OA_PRODUCTVERSION sql.NullString `json:"oa_product_version"`
  29. OA_CPU sql.NullString `json:"oa_cpu"`
  30. OA_OPERATINGSYSTEM sql.NullString `json:"oa_operating_system"`
  31. OA_MAINMAC sql.NullString `json:"oa_main_mac"`
  32. OA_SECONDMAC sql.NullString `json:"oa_secondary_mac"`
  33. OA_CREATIONDATE sql.NullString `json:"oa_creation_date"`
  34. OA_CREATIONTIME sql.NullString `json:"oa_creation_time"`
  35. OA_LASTOPERATEDATE sql.NullString `json:"oa_last_operation_date"`
  36. OA_LASTOPERATETIME sql.NullString `json:"oa_last_operation_time"`
  37. CaptureTime sql.NullTime `json:"capture_time"`
  38. }
  39. func StartCapture(sourceDB, targetDB *sql.DB) {
  40. tableName := "target_OA_license"
  41. // 读取上次同步时间
  42. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  43. if err != nil {
  44. log.Fatal("Failed to retrieve last sync time from target DB:", err)
  45. }
  46. // 启动数据同步,每隔15分钟运行一次
  47. for {
  48. fmt.Println("Start capturing data...")
  49. checkTemp, err := checkSourceOaLicTime(sourceDB, lastSyncTime)
  50. if err != nil {
  51. log.Println("Failed to checkSourceOaLicTime:", err)
  52. }
  53. if !checkTemp {
  54. time.Sleep(15 * time.Second)
  55. continue
  56. }
  57. fmt.Println("有新数据")
  58. lastSyncTime1, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  59. lastSyncTime = lastSyncTime1
  60. if err != nil {
  61. log.Println("Failed to capture data:", err)
  62. }
  63. fmt.Println("抓完一次数据")
  64. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  65. log.Println("Failed to insert sync metadata into target DB:", err)
  66. }
  67. fmt.Println("插入时间: ", lastSyncTime)
  68. if count > 0 {
  69. //lic关联用户表
  70. insertLicToUser(count, targetDB)
  71. fmt.Println("插入关联用户: ", count)
  72. }
  73. time.Sleep(15 * time.Second)
  74. }
  75. }
  76. func insertLicToUser(count int, targetDB *sql.DB) {
  77. //查询抓取表中最新的count条数据
  78. query := "SELECT UNIQUE_ID,OA_SALESPERSONNAME, OA_OPERATIONSPERSONNAME FROM target_OA_license ORDER BY capture_time DESC LIMIT ?"
  79. rows, err := targetDB.Query(query, count)
  80. if err != nil {
  81. log.Println("Failed to execute query on source DB:", err)
  82. }
  83. defer rows.Close()
  84. for rows.Next() {
  85. var licUniqueID, salesPersonName, operationsPersonName sql.NullString
  86. err := rows.Scan(&licUniqueID, &salesPersonName, &operationsPersonName)
  87. if err != nil {
  88. log.Println("insertLicToUser find newdata Failed to scan row:", err)
  89. continue
  90. }
  91. //将获取到的字段,和用户表关联对比
  92. var userUNIQUEID, userACCOUNT string
  93. query2 := `SELECT UNIQUEID, ACCOUNT FROM LIC_USER where
  94. (USERNAME = ? OR USERNAME = ? )
  95. and role = 'ADMIN' OR role = 'supportRole' OR role = 'salesRole';`
  96. err = targetDB.QueryRow(query2, salesPersonName, operationsPersonName).Scan(&userUNIQUEID, &userACCOUNT)
  97. if err != nil {
  98. log.Println("find user name Failed to scan row :", err)
  99. continue
  100. }
  101. //fmt.Println("关联到用户 ", userUNIQUEID, userACCOUNT)
  102. insertQuery := "INSERT INTO LICENSERECORDTOUSER(LICENSE_UNIQUEID,USER_UNIQUEID,USER_ACCOUNT,OPERATOR_UNIQUEID,UP_TIME) VALUES(?,?,?,'SYSTEM_AUTO',?)"
  103. _, err = targetDB.Exec(insertQuery,
  104. licUniqueID,
  105. userUNIQUEID,
  106. userACCOUNT,
  107. time.Now(),
  108. )
  109. if err != nil {
  110. log.Println("Failed to sql2 insert record into target DB:", err)
  111. }
  112. }
  113. }
  114. // CaptureOnce 主动触发一次数据抓取
  115. func CaptureOnce(sourceDB, targetDB *sql.DB) {
  116. tableName := "target_OA_license"
  117. // 读取上次同步时间
  118. lastSyncTime, err := getLastSyncTime(targetDB, tableName)
  119. if err != nil {
  120. log.Println("Failed to retrieve last sync time from target DB:", err)
  121. return
  122. }
  123. // 执行一次数据抓取
  124. lastSyncTime, count, err := captureData(sourceDB, targetDB, lastSyncTime)
  125. if err := insertSyncMetadata(targetDB, tableName, lastSyncTime); err != nil {
  126. log.Println("Failed to insert sync metadata into target DB:", err)
  127. }
  128. fmt.Println("Captured", count, "records.")
  129. }
  130. // getLastSyncTime 从数据库中获取指定表的上次同步时间
  131. func getLastSyncTime(db *sql.DB, tableName string) (time.Time, error) {
  132. var lastSyncTime time.Time
  133. query := `
  134. SELECT last_sync_time
  135. FROM sync_metadata
  136. WHERE table_name = ?
  137. ORDER BY last_sync_time DESC
  138. LIMIT 1
  139. `
  140. err := db.QueryRow(query, tableName).Scan(&lastSyncTime)
  141. if err != nil {
  142. return time.Time{}, err
  143. }
  144. return lastSyncTime, nil
  145. }
  146. // insertSyncMetadata 插入新的同步元数据
  147. func insertSyncMetadata(db *sql.DB, tableName string, lastSyncTime time.Time) error {
  148. query := `
  149. INSERT INTO sync_metadata (table_name, last_sync_time)
  150. VALUES (?, ?)
  151. `
  152. _, err := db.Exec(query, tableName, lastSyncTime)
  153. return err
  154. }
  155. func checkSourceOaLicTime(sourceDB *sql.DB, lastSyncTime time.Time) (bool, error) {
  156. var checkCount int
  157. checkTime := `SELECT count(1) FROM XUGU.WORKFLOW_REQUESTBASE WR
  158. where
  159. TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS');`
  160. error := sourceDB.QueryRow(checkTime, lastSyncTime).Scan(&checkCount)
  161. if error != nil {
  162. log.Println("Failed to check count:", error)
  163. return false, error
  164. }
  165. if checkCount == 0 {
  166. fmt.Println("没有新数据需要同步")
  167. return false, nil
  168. }
  169. return true, nil
  170. }
  171. // captureData 执行数据抓取和同步
  172. func captureData(sourceDB, targetDB *sql.DB, lastSyncTime time.Time) (time.Time, int, error) {
  173. query := `
  174. SELECT
  175. fmd.id ,
  176. fm.REQUESTID,
  177. wr.REQUESTNAME,
  178. wr.REQUESTNAMENEW,
  179. wr.REQUESTNAMEHTMLNEW,
  180. fm.glxm,
  181. PP.NAME,
  182. fm.SQSJ,
  183. hrm1.LASTNAME,
  184. hrm2.LASTNAME,
  185. fm.XSJSYX,
  186. fm.JFJSYX,
  187. fm.SYDW,
  188. fm.XMXXMS,
  189. fm.JDS,
  190. fmd.JDS,
  191. ws1.SELECTVALUE,
  192. ws1.SELECTNAME,
  193. ws2.SELECTNAME,
  194. fmd.CLQ,
  195. fmd.CZXT,
  196. fmd.IP,
  197. fmd.MAC,
  198. wr.CREATEDATE,
  199. wr.CREATETIME,
  200. wr.LASTOPERATEDATE,
  201. wr.LASTOPERATETIME
  202. FROM XUGU.formtable_main_146 fm
  203. LEFT JOIN XUGU.HRMRESOURCE hrm1 ON TO_NUMBER(fm.XSRY) = hrm1.id
  204. LEFT JOIN XUGU.HRMRESOURCE hrm2 ON TO_NUMBER(fm.jfry) = hrm2.id
  205. LEFT JOIN XUGU.FORMTABLE_MAIN_146_dt1 fmd ON fmd.mainid = fm.id
  206. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws1 ON fmd.cpmc = ws1.SELECTVALUE AND ws1.FIELDID = 14627
  207. LEFT JOIN XUGU.WORKFLOW_SELECTITEM ws2 ON fmd.BB = ws2.SELECTVALUE AND ws2.FIELDID = 14628
  208. LEFT JOIN XUGU.WORKFLOW_REQUESTBASE WR ON fm.REQUESTID = WR.REQUESTID
  209. LEFT JOIN XUGU.PRJ_PROJECTINFO PP ON fm.glxm = PP.ID
  210. WHERE TO_TIMESTAMP(wr.CREATEDATE || ' ' || wr.CREATETIME, 'YYYY-MM-DD HH24:MI:SS') > TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS')
  211. ORDER BY wr.CREATEDATE DESC, wr.CREATETIME DESC
  212. PARALLEL 8`
  213. var count int
  214. rows, err := sourceDB.Query(query, lastSyncTime.Format("2006-01-02 15:04:05"))
  215. if err != nil {
  216. log.Println("Failed to execute query on source DB:", err)
  217. return lastSyncTime, 0, err
  218. }
  219. defer rows.Close()
  220. var newSyncTime time.Time
  221. firstRecord := true
  222. for rows.Next() {
  223. var record OALicRecord
  224. count++
  225. if err := rows.Scan(
  226. &record.OA_ID,
  227. &record.OA_REQUESTID,
  228. &record.OA_REQUESTNAME,
  229. &record.OA_REQUESTNAMENEW,
  230. &record.OA_REQUESTNAMEHTMLNEW,
  231. &record.OA_GLXMID,
  232. &record.OA_GLXMNAME,
  233. &record.OA_SQSJ,
  234. &record.OA_SALESPERSONNAME,
  235. &record.OA_OPERATIONSPERSONNAME,
  236. &record.OA_XSJSYX,
  237. &record.OA_JFJSYX,
  238. &record.OA_SYDW,
  239. &record.OA_XMXXMS,
  240. &record.OA_JDS,
  241. &record.OA_NODECOUNT,
  242. &record.OA_PRODUCTCODE,
  243. &record.OA_PRODUCTNAME,
  244. &record.OA_PRODUCTVERSION,
  245. &record.OA_CPU,
  246. &record.OA_OPERATINGSYSTEM,
  247. &record.OA_MAINMAC,
  248. &record.OA_SECONDMAC,
  249. &record.OA_CREATIONDATE,
  250. &record.OA_CREATIONTIME,
  251. &record.OA_LASTOPERATEDATE,
  252. &record.OA_LASTOPERATETIME,
  253. ); err != nil {
  254. log.Println("Failed to scan row from source DB:", err)
  255. return lastSyncTime, 0, err
  256. }
  257. record.CaptureTime = sql.NullTime{
  258. Time: time.Now(),
  259. Valid: true,
  260. }
  261. // 插入新数据到目标数据库
  262. err := insertOALicRecordIntoTargetDB(targetDB, record)
  263. if err != nil {
  264. log.Println("Failed to insert record into target DB:", err)
  265. return lastSyncTime, 0, err
  266. }
  267. //fmt.Printf("Inserted record with OA_REQUESTID %d into target DB.\n", record.OA_REQUESTID.Int32)
  268. // 在第一次迭代时更新 newSyncTime 为当前记录的创建时间
  269. if firstRecord && record.OA_CREATIONDATE.Valid && record.OA_CREATIONTIME.Valid {
  270. crDate, _ := time.Parse("2006-01-02", record.OA_CREATIONDATE.String)
  271. crTime, _ := time.Parse("15:04:05", record.OA_CREATIONTIME.String)
  272. newSyncTime = time.Date(crDate.Year(), crDate.Month(), crDate.Day(), crTime.Hour(), crTime.Minute(), crTime.Second(), 0, time.Local)
  273. firstRecord = false
  274. }
  275. }
  276. // 如果有新数据同步,则更新 lastSyncTime
  277. if !newSyncTime.IsZero() {
  278. lastSyncTime = newSyncTime
  279. }
  280. return lastSyncTime, count, err
  281. }
  282. // insertOALicRecordIntoTargetDB 插入新记录到目标数据库
  283. func insertOALicRecordIntoTargetDB(targetDB *sql.DB, record OALicRecord) error {
  284. //fmt.Println("插入新记录到目标数据库")
  285. tx, err := targetDB.Begin()
  286. if err != nil {
  287. return fmt.Errorf("begin transaction: %v", err)
  288. }
  289. defer func() {
  290. if err != nil {
  291. tx.Rollback()
  292. } else {
  293. err = tx.Commit()
  294. }
  295. }()
  296. uniqueID := uuid.New()
  297. sql1 := `
  298. INSERT INTO target_OA_license (
  299. Unique_ID,
  300. OA_ID,
  301. OA_REQUESTID,
  302. OA_REQUESTNAME,
  303. OA_REQUESTNAMENEW,
  304. OA_REQUESTNAMEHTMLNEW,
  305. OA_GLXMID,
  306. OA_GLXMNAME,
  307. OA_SQSJ,
  308. OA_SALESPERSONNAME,
  309. OA_XSJSYX,
  310. OA_OPERATIONSPERSONNAME,
  311. OA_JFJSYX,
  312. OA_SYDW,
  313. OA_XMXXMS,
  314. OA_JDS,
  315. OA_NODECOUNT,
  316. OA_PRODUCTCODE,
  317. OA_PRODUCTNAME,
  318. OA_PRODUCTVERSION,
  319. OA_CPU,
  320. OA_OPERATINGSYSTEM,
  321. OA_MAINMAC,
  322. OA_SECONDMAC,
  323. OA_CREATIONDATE,
  324. OA_CREATIONTIME,
  325. OA_LASTOPERATEDATE,
  326. OA_LASTOPERATETIME,
  327. capture_Time
  328. ) VALUES (?,?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,? ,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?)`
  329. _, err = tx.Exec(sql1,
  330. uniqueID,
  331. record.OA_ID,
  332. record.OA_REQUESTID,
  333. record.OA_REQUESTNAME,
  334. record.OA_REQUESTNAMENEW,
  335. record.OA_REQUESTNAMEHTMLNEW,
  336. record.OA_GLXMID,
  337. record.OA_GLXMNAME,
  338. record.OA_SQSJ,
  339. record.OA_SALESPERSONNAME,
  340. record.OA_XSJSYX,
  341. record.OA_OPERATIONSPERSONNAME,
  342. record.OA_JFJSYX,
  343. record.OA_SYDW,
  344. record.OA_XMXXMS,
  345. record.OA_JDS,
  346. record.OA_NODECOUNT,
  347. record.OA_PRODUCTCODE,
  348. record.OA_PRODUCTNAME,
  349. record.OA_PRODUCTVERSION,
  350. record.OA_CPU,
  351. record.OA_OPERATINGSYSTEM,
  352. record.OA_MAINMAC,
  353. record.OA_SECONDMAC,
  354. record.OA_CREATIONDATE,
  355. record.OA_CREATIONTIME,
  356. record.OA_LASTOPERATEDATE,
  357. record.OA_LASTOPERATETIME,
  358. record.CaptureTime,
  359. )
  360. if err != nil {
  361. log.Println("Failed to sql1 insert record into target DB:", err)
  362. }
  363. sql2 := `
  364. INSERT INTO License_generate_Info (
  365. License_UniqueID,
  366. OA_ID,
  367. License_Flage)
  368. VALUES (?, ?,?)
  369. `
  370. _, err = tx.Exec(sql2,
  371. uniqueID,
  372. record.OA_ID,
  373. "未生成",
  374. )
  375. if err != nil {
  376. log.Println("Failed to sql2 insert record into target DB:", err)
  377. return err
  378. }
  379. return nil
  380. }