manager.go 15 KB


  1. package task
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "math/rand"
  9. "sync"
  10. "time"
  11. logpkg "dbview/service/internal/common/logger"
  12. "github.com/gorilla/websocket"
  13. "go.uber.org/zap"
  14. )
  15. // Manager 任务管理器,负责任务的创建、执行和状态跟踪
  16. type Manager struct {
  17. tasks map[string]*Task // 任务ID到任务的映射
  18. mutex sync.RWMutex // 保护tasks的互斥锁
  19. workerPool chan struct{} // 工作池,控制并发数量
  20. wg sync.WaitGroup // 等待所有任务完成
  21. ctx context.Context // 上下文,用于取消所有任务
  22. cancel context.CancelFunc // 取消函数
  23. // WebSocket 订阅管理:taskID -> set of wrapped websocket connections
  24. wsMu sync.Mutex
  25. wsSubs map[string]map[*wsConn]struct{}
  26. connMap map[*websocket.Conn]*wsConn // 原始连接到包装的映射,用于查找
  27. }
  28. // NewManager 创建新的任务管理器
  29. // maxConcurrent 最大并发任务数
  30. func NewManager(ctx context.Context, maxConcurrent int) *Manager {
  31. ctx, cancel := context.WithCancel(ctx)
  32. return &Manager{
  33. tasks: make(map[string]*Task),
  34. workerPool: make(chan struct{}, maxConcurrent),
  35. ctx: ctx,
  36. cancel: cancel,
  37. wsSubs: make(map[string]map[*wsConn]struct{}),
  38. connMap: make(map[*websocket.Conn]*wsConn),
  39. }
  40. }
  41. // wsConn 是对 websocket.Conn 的包装结构。
  42. // 目的:
  43. // - 将原始的 *websocket.Conn 包装为带有发送/接收缓冲的本地结构,
  44. // 避免直接并发调用 websocket.Conn 的写方法(导致 concurrent write panic)。
  45. // - 提供独立的发送通道 `send`,由单独的 write pump 序列化所有写入操作;
  46. // 以及接收通道 `inbound`,由 read pump 将来自客户端的消息入队,供上层读取。
  47. //
  48. // 字段说明:
  49. // - conn: 底层的 *websocket.Conn 实例。
  50. // - send: 发送缓冲通道(消息由服务端入队,此通道由 write pump 串行写入 websocket)。
  51. // 缓冲大小应有限制以防止单一慢客户端耗尽内存(目前使用 256 条消息)。
  52. // - inbound: 接收缓冲通道(来自客户端的消息由 read pump 入队,上层通过 Register 获取此通道)。
  53. // inbound 的缓冲大小也有限制(目前使用 16 条消息),超过则丢弃客户端消息以保护服务器。
  54. // - closeCh: 一个用于通知 write pump 退出的信号通道(由 Unsubscribe 或清理逻辑关闭)。
  55. type wsConn struct {
  56. conn *websocket.Conn
  57. send chan []byte
  58. inbound chan []byte
  59. closeCh chan struct{}
  60. }
  61. const (
  62. writeWait = 10 * time.Second
  63. pongWait = 60 * time.Second
  64. pingPeriod = (pongWait * 9) / 10
  65. maxMessageSize = 1024 * 8
  66. )
  67. // GenerateTaskID 生成唯一任务ID
  68. func (m *Manager) GenerateTaskID() string {
  69. bytes := make([]byte, 8)
  70. _, err := rand.Read(bytes)
  71. if err != nil {
  72. // 作为 fallback,使用时间戳+随机数
  73. return fmt.Sprintf("%d%04d", time.Now().UnixNano(), rand.Intn(10000))
  74. }
  75. return hex.EncodeToString(bytes)
  76. }
  77. // CreateTask 创建新任务但不执行
  78. func (m *Manager) CreateTask(taskType string, opts ...TaskOption) *Task {
  79. taskID := m.GenerateTaskID()
  80. now := time.Now()
  81. task := &Task{
  82. ID: taskID,
  83. Type: taskType,
  84. Status: TaskPending,
  85. Progress: 0,
  86. CreatedAt: now,
  87. }
  88. // 应用选项
  89. for _, opt := range opts {
  90. opt(task)
  91. }
  92. m.mutex.Lock()
  93. m.tasks[taskID] = task
  94. m.mutex.Unlock()
  95. return task
  96. }
  97. // ExecuteTask 执行任务
  98. func (m *Manager) ExecuteTask(task *Task, taskFunc TaskFunc) {
  99. m.wg.Add(1)
  100. go func() {
  101. defer m.wg.Done()
  102. // 等待工作池有空位或上下文被取消
  103. select {
  104. case m.workerPool <- struct{}{}:
  105. defer func() { <-m.workerPool }()
  106. case <-m.ctx.Done():
  107. m.updateTaskStatus(task.ID, TaskCancelled, 0, nil, m.ctx.Err().Error())
  108. return
  109. }
  110. // 更新任务为运行中状态
  111. startTime := time.Now()
  112. m.mutex.Lock()
  113. task.Status = TaskRunning
  114. task.StartedAt = &startTime
  115. m.mutex.Unlock()
  116. // 通知订阅者任务已开始
  117. m.publishEvent(task.ID, map[string]interface{}{"taskId": task.ID, "status": string(TaskRunning), "progress": task.Progress})
  118. // 执行任务函数
  119. result, err := taskFunc(m.ctx, func(progress int) {
  120. // 确保进度在0-100之间
  121. if progress < 0 {
  122. progress = 0
  123. } else if progress > 100 {
  124. progress = 100
  125. }
  126. m.updateTaskProgress(task.ID, progress)
  127. })
  128. // 更新任务完成状态
  129. completedTime := time.Now()
  130. if err != nil {
  131. m.updateTaskStatus(task.ID, TaskFailed, 100, nil, err.Error())
  132. } else {
  133. m.updateTaskStatus(task.ID, TaskSuccess, 100, result, "")
  134. }
  135. m.mutex.Lock()
  136. task.CompletedAt = &completedTime
  137. m.mutex.Unlock()
  138. // 通知订阅者任务已完成(包含最终状态和结果/错误)
  139. m.publishEvent(task.ID, map[string]interface{}{"taskId": task.ID, "status": string(task.Status), "progress": task.Progress, "result": task.Result, "error": task.Error})
  140. }()
  141. }
  142. // SubmitTask 创建并执行任务
  143. func (m *Manager) SubmitTask(taskType string, taskFunc TaskFunc, opts ...TaskOption) *Task {
  144. task := m.CreateTask(taskType, opts...)
  145. m.ExecuteTask(task, taskFunc)
  146. return task
  147. }
  148. // GetTaskStatus 获取任务状态
  149. func (m *Manager) GetTaskStatus(taskID string) (*Task, error) {
  150. m.mutex.RLock()
  151. defer m.mutex.RUnlock()
  152. task, exists := m.tasks[taskID]
  153. if !exists {
  154. return nil, errors.New("任务不存在")
  155. }
  156. // 返回任务的副本,避免外部修改
  157. return &Task{
  158. ID: task.ID,
  159. Type: task.Type,
  160. Source: task.Source,
  161. Status: task.Status,
  162. Progress: task.Progress,
  163. Result: task.Result,
  164. Error: task.Error,
  165. CreatedAt: task.CreatedAt,
  166. StartedAt: copyTime(task.StartedAt),
  167. CompletedAt: copyTime(task.CompletedAt),
  168. }, nil
  169. }
  170. // CancelTask 取消任务
  171. func (m *Manager) CancelTask(taskID string) error {
  172. m.mutex.Lock()
  173. defer m.mutex.Unlock()
  174. task, exists := m.tasks[taskID]
  175. if !exists {
  176. return errors.New("任务不存在")
  177. }
  178. // 只有等待中或运行中的任务可以被取消
  179. if task.Status != TaskPending && task.Status != TaskRunning {
  180. return errors.New("任务无法取消")
  181. }
  182. task.Status = TaskCancelled
  183. completedTime := time.Now()
  184. task.CompletedAt = &completedTime
  185. task.Error = "任务已被取消"
  186. return nil
  187. }
  188. // ListTasks 列出指定状态的任务
  189. func (m *Manager) ListTasks(status TaskStatus) []*Task {
  190. m.mutex.RLock()
  191. defer m.mutex.RUnlock()
  192. var result []*Task
  193. for _, task := range m.tasks {
  194. if status == "" || task.Status == status {
  195. // 返回副本
  196. result = append(result, &Task{
  197. ID: task.ID,
  198. Type: task.Type,
  199. Source: task.Source,
  200. Status: task.Status,
  201. Progress: task.Progress,
  202. Result: task.Result,
  203. Error: task.Error,
  204. CreatedAt: task.CreatedAt,
  205. StartedAt: copyTime(task.StartedAt),
  206. CompletedAt: copyTime(task.CompletedAt),
  207. })
  208. }
  209. }
  210. return result
  211. }
  212. // Shutdown 关闭任务管理器,等待所有任务完成
  213. func (m *Manager) Shutdown() {
  214. m.cancel()
  215. m.wg.Wait()
  216. }
  217. // 内部方法:更新任务进度
  218. func (m *Manager) updateTaskProgress(taskID string, progress int) {
  219. m.mutex.Lock()
  220. defer m.mutex.Unlock()
  221. if task, exists := m.tasks[taskID]; exists {
  222. task.Progress = progress
  223. // 发布进度更新给订阅者
  224. m.publishEvent(taskID, map[string]interface{}{"taskId": taskID, "status": string(task.Status), "progress": task.Progress})
  225. }
  226. }
  227. // 内部方法:更新任务状态
  228. func (m *Manager) updateTaskStatus(taskID string, status TaskStatus, progress int, result interface{}, errMsg string) {
  229. m.mutex.Lock()
  230. defer m.mutex.Unlock()
  231. if task, exists := m.tasks[taskID]; exists {
  232. task.Status = status
  233. task.Progress = progress
  234. task.Result = result
  235. task.Error = errMsg
  236. // 发布状态变更事件(非阻塞)
  237. go m.publishEvent(taskID, map[string]interface{}{"taskId": taskID, "status": string(status), "progress": progress, "result": result, "error": errMsg})
  238. }
  239. }
  240. // Subscribe 将 websocket 连接注册到指定 taskID 的订阅列表
  241. func (m *Manager) Subscribe(conn *websocket.Conn, taskID string) {
  242. m.wsMu.Lock()
  243. defer m.wsMu.Unlock()
  244. // 获取或创建包装
  245. w := m.connMap[conn]
  246. if w == nil {
  247. // 如果尚未注册读写泵,注册一个
  248. w = &wsConn{conn: conn}
  249. m.connMap[conn] = w
  250. // lazy init channels to avoid nil panics
  251. w.send = make(chan []byte, 256)
  252. w.inbound = make(chan []byte, 16)
  253. w.closeCh = make(chan struct{})
  254. m.startPumps(w)
  255. }
  256. subs, ok := m.wsSubs[taskID]
  257. if !ok {
  258. subs = make(map[*wsConn]struct{})
  259. m.wsSubs[taskID] = subs
  260. }
  261. subs[w] = struct{}{}
  262. // log subscribe
  263. lg := logpkg.FromContext(context.Background())
  264. lg.Info("ws subscribe", zap.String("taskId", taskID), zap.String("remote", conn.RemoteAddr().String()))
  265. }
  266. // Register 注册 websocket 连接并启动对应的读/写 pump。
  267. // 返回值:
  268. // - inbound 通道:上层(handler)应从该通道读取客户端发来的消息(订阅/退订等命令)。
  269. //
  270. // 使用说明:
  271. // - 如果同一个 *websocket.Conn 重复调用 Register,会返回已存在的 inbound 通道(幂等)。
  272. // - Register 会创建并启动两个 goroutine:read pump(将客户端消息写入 inbound)和
  273. // write pump(从 send 通道中读取消息并序列化写入底层连接,还负责心跳 ping)。
  274. //
  275. // 并发/安全性:
  276. // - Register 在内部持有 wsMu 锁以保证 connMap 的并发安全。
  277. // - write pump 是唯一会直接调用底层 conn.Write* 的协程,从而避免 concurrent write panic。
  278. func (m *Manager) Register(conn *websocket.Conn) chan []byte {
  279. m.wsMu.Lock()
  280. defer m.wsMu.Unlock()
  281. if existing, ok := m.connMap[conn]; ok {
  282. return existing.inbound
  283. }
  284. w := &wsConn{
  285. conn: conn,
  286. send: make(chan []byte, 256),
  287. inbound: make(chan []byte, 16),
  288. closeCh: make(chan struct{}),
  289. }
  290. m.connMap[conn] = w
  291. m.startPumps(w)
  292. lg := logpkg.FromContext(context.Background())
  293. lg.Info("ws conn registered", zap.String("remote", conn.RemoteAddr().String()))
  294. return w.inbound
  295. }
  296. // Unsubscribe 从指定 taskID 的订阅列表移除连接。
  297. // 行为说明:
  298. // - 仅从该 task 的订阅集合中移除包装 wsConn,如果该连接不再订阅任何任务,则完全移除并关闭关联资源。
  299. // - 当连接不再被任何 task 使用时,函数会从 connMap 中删除对应条目,关闭 w.closeCh(通知 write pump 退出),
  300. // 并记录连接被移除的日志。注意:read pump 也会在检测到连接断开时做相似清理(互补)。
  301. //
  302. // 并发性:
  303. // - 本方法持有 wsMu 锁以保护 wsSubs 与 connMap 的一致性。
  304. func (m *Manager) Unsubscribe(conn *websocket.Conn, taskID string) {
  305. m.wsMu.Lock()
  306. defer m.wsMu.Unlock()
  307. w := m.connMap[conn]
  308. if w == nil {
  309. return
  310. }
  311. if subs, ok := m.wsSubs[taskID]; ok {
  312. delete(subs, w)
  313. if len(subs) == 0 {
  314. delete(m.wsSubs, taskID)
  315. }
  316. }
  317. // 如果该连接不再订阅任何 task,则从 connMap 移除
  318. stillUsed := false
  319. for _, subs := range m.wsSubs {
  320. if _, ok := subs[w]; ok {
  321. stillUsed = true
  322. break
  323. }
  324. }
  325. if !stillUsed {
  326. delete(m.connMap, conn)
  327. // close pumps
  328. close(w.closeCh)
  329. lg := logpkg.FromContext(context.Background())
  330. lg.Info("ws conn removed", zap.String("remote", conn.RemoteAddr().String()))
  331. }
  332. }
  333. // publishEvent 将事件推送给订阅 taskID 的所有 websocket 连接(非阻塞入队到各自 send 通道)。
  334. // 设计要点:
  335. // - publishEvent 不直接往 websocket 连接写入数据,而是将序列化后的消息入队到每个订阅者的 `send` 缓冲通道,
  336. // 由各自的 write pump 负责实际的网络写入,从而避免并发写冲突。
  337. // - 为了保护服务器稳定性,如果某个连接的 send 通道已满(可能由慢客户端导致),当前实现会记录告警并取消该连接的订阅,
  338. // 并尝试关闭底层连接。该策略可以避免单个慢客户端或恶意客户端耗尽服务器资源。
  339. // - 注意:publishEvent 在跨进程/多实例部署下仅能推送到本实例内的订阅者,若需要跨实例广播需引入外部 pub/sub(如 Redis)。
  340. func (m *Manager) publishEvent(taskID string, event interface{}) {
  341. m.wsMu.Lock()
  342. subs := m.wsSubs[taskID]
  343. m.wsMu.Unlock()
  344. if subs == nil || len(subs) == 0 {
  345. return
  346. }
  347. b, err := json.Marshal(event)
  348. if err != nil {
  349. return
  350. }
  351. lg := logpkg.FromContext(context.Background())
  352. for w := range subs {
  353. select {
  354. case w.send <- b:
  355. // enqueued
  356. default:
  357. // send buffer full, drop and unregister
  358. lg.Warn("ws send buffer full, unsubscribing", zap.String("taskId", taskID), zap.String("remote", w.conn.RemoteAddr().String()))
  359. m.Unsubscribe(w.conn, taskID)
  360. _ = w.conn.Close()
  361. }
  362. }
  363. }
  364. // WriteJSON 将 v 序列化为 JSON 并尝试安全地发送到指定的 websocket 连接。
  365. // 如果该连接由 manager 包装(存在于 connMap),则消息会被入队到其 send 通道,由 write pump 串行写出;
  366. // 否则会直接使用 conn.WriteJSON 写入(不推荐,因为可能造成并发写)。
  367. // 返回错误场景:
  368. // - JSON 序列化失败 -> 返回序列化错误
  369. // - send 通道已满 -> 返回错误并记录警告(调用方可选择重试或忽略)
  370. //
  371. // 说明:推荐上层总是通过 Manager.WriteJSON 或 publishEvent 让 write pump 负责写操作,避免 concurrent write 问题。
  372. func (m *Manager) WriteJSON(conn *websocket.Conn, v interface{}) error {
  373. m.wsMu.Lock()
  374. w := m.connMap[conn]
  375. m.wsMu.Unlock()
  376. if w == nil {
  377. // 如果连接未包装,直接写(但这不推荐)
  378. return conn.WriteJSON(v)
  379. }
  380. b, err := json.Marshal(v)
  381. if err != nil {
  382. return err
  383. }
  384. select {
  385. case w.send <- b:
  386. return nil
  387. default:
  388. // buffer full
  389. lg := logpkg.FromContext(context.Background())
  390. lg.Warn("ws writejson buffer full", zap.String("remote", conn.RemoteAddr().String()))
  391. return fmt.Errorf("ws send buffer full")
  392. }
  393. }
  394. // 辅助函数:复制时间指针
  395. func copyTime(t *time.Time) *time.Time {
  396. if t == nil {
  397. return nil
  398. }
  399. newTime := *t
  400. return &newTime
  401. }
  402. // startPumps 启动 read/write pumps,使用 wsConn 的 channels
  403. func (m *Manager) startPumps(w *wsConn) {
  404. // read pump
  405. go func() {
  406. conn := w.conn
  407. conn.SetReadLimit(maxMessageSize)
  408. conn.SetReadDeadline(time.Now().Add(pongWait))
  409. conn.SetPongHandler(func(string) error {
  410. conn.SetReadDeadline(time.Now().Add(pongWait))
  411. return nil
  412. })
  413. for {
  414. _, message, err := conn.ReadMessage()
  415. if err != nil {
  416. // read error or closed
  417. break
  418. }
  419. select {
  420. case w.inbound <- message:
  421. default:
  422. // drop if inbound full
  423. }
  424. }
  425. // cleanup on exit
  426. m.wsMu.Lock()
  427. delete(m.connMap, conn)
  428. // remove from all subscriptions
  429. for tid, subs := range m.wsSubs {
  430. if _, ok := subs[w]; ok {
  431. delete(subs, w)
  432. if len(subs) == 0 {
  433. delete(m.wsSubs, tid)
  434. }
  435. }
  436. }
  437. m.wsMu.Unlock()
  438. close(w.send)
  439. close(w.inbound)
  440. _ = conn.Close()
  441. }()
  442. // write pump
  443. go func() {
  444. conn := w.conn
  445. ticker := time.NewTicker(pingPeriod)
  446. defer func() {
  447. ticker.Stop()
  448. _ = conn.Close()
  449. }()
  450. for {
  451. select {
  452. case message, ok := <-w.send:
  453. conn.SetWriteDeadline(time.Now().Add(writeWait))
  454. if !ok {
  455. // channel closed
  456. _ = conn.WriteMessage(websocket.CloseMessage, []byte{})
  457. return
  458. }
  459. if err := conn.WriteMessage(websocket.TextMessage, message); err != nil {
  460. return
  461. }
  462. case <-ticker.C:
  463. conn.SetWriteDeadline(time.Now().Add(writeWait))
  464. if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  465. return
  466. }
  467. case <-w.closeCh:
  468. return
  469. }
  470. }
  471. }()
  472. }