manager.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package task
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "errors"
  6. "fmt"
  7. "math/rand"
  8. "sync"
  9. "time"
  10. )
  11. // Manager 任务管理器,负责任务的创建、执行和状态跟踪
  12. type Manager struct {
  13. tasks map[string]*Task // 任务ID到任务的映射
  14. mutex sync.RWMutex // 保护tasks的互斥锁
  15. workerPool chan struct{} // 工作池,控制并发数量
  16. wg sync.WaitGroup // 等待所有任务完成
  17. ctx context.Context // 上下文,用于取消所有任务
  18. cancel context.CancelFunc // 取消函数
  19. }
  20. // NewManager 创建新的任务管理器
  21. // maxConcurrent 最大并发任务数
  22. func NewManager(ctx context.Context, maxConcurrent int) *Manager {
  23. ctx, cancel := context.WithCancel(ctx)
  24. return &Manager{
  25. tasks: make(map[string]*Task),
  26. workerPool: make(chan struct{}, maxConcurrent),
  27. ctx: ctx,
  28. cancel: cancel,
  29. }
  30. }
  31. // GenerateTaskID 生成唯一任务ID
  32. func (m *Manager) GenerateTaskID() string {
  33. bytes := make([]byte, 8)
  34. _, err := rand.Read(bytes)
  35. if err != nil {
  36. // 作为 fallback,使用时间戳+随机数
  37. return fmt.Sprintf("%d%04d", time.Now().UnixNano(), rand.Intn(10000))
  38. }
  39. return hex.EncodeToString(bytes)
  40. }
  41. // CreateTask 创建新任务但不执行
  42. func (m *Manager) CreateTask(taskType string, opts ...TaskOption) *Task {
  43. taskID := m.GenerateTaskID()
  44. now := time.Now()
  45. task := &Task{
  46. ID: taskID,
  47. Type: taskType,
  48. Status: TaskPending,
  49. Progress: 0,
  50. CreatedAt: now,
  51. }
  52. // 应用选项
  53. for _, opt := range opts {
  54. opt(task)
  55. }
  56. m.mutex.Lock()
  57. m.tasks[taskID] = task
  58. m.mutex.Unlock()
  59. return task
  60. }
  61. // ExecuteTask 执行任务
  62. func (m *Manager) ExecuteTask(task *Task, taskFunc TaskFunc) {
  63. m.wg.Add(1)
  64. go func() {
  65. defer m.wg.Done()
  66. // 等待工作池有空位或上下文被取消
  67. select {
  68. case m.workerPool <- struct{}{}:
  69. defer func() { <-m.workerPool }()
  70. case <-m.ctx.Done():
  71. m.updateTaskStatus(task.ID, TaskCancelled, 0, nil, m.ctx.Err().Error())
  72. return
  73. }
  74. // 更新任务为运行中状态
  75. startTime := time.Now()
  76. m.mutex.Lock()
  77. task.Status = TaskRunning
  78. task.StartedAt = &startTime
  79. m.mutex.Unlock()
  80. // 执行任务函数
  81. result, err := taskFunc(m.ctx, func(progress int) {
  82. // 确保进度在0-100之间
  83. if progress < 0 {
  84. progress = 0
  85. } else if progress > 100 {
  86. progress = 100
  87. }
  88. m.updateTaskProgress(task.ID, progress)
  89. })
  90. // 更新任务完成状态
  91. completedTime := time.Now()
  92. if err != nil {
  93. m.updateTaskStatus(task.ID, TaskFailed, 100, nil, err.Error())
  94. } else {
  95. m.updateTaskStatus(task.ID, TaskSuccess, 100, result, "")
  96. }
  97. m.mutex.Lock()
  98. task.CompletedAt = &completedTime
  99. m.mutex.Unlock()
  100. }()
  101. }
  102. // SubmitTask 创建并执行任务
  103. func (m *Manager) SubmitTask(taskType string, taskFunc TaskFunc, opts ...TaskOption) *Task {
  104. task := m.CreateTask(taskType, opts...)
  105. m.ExecuteTask(task, taskFunc)
  106. return task
  107. }
  108. // GetTaskStatus 获取任务状态
  109. func (m *Manager) GetTaskStatus(taskID string) (*Task, error) {
  110. m.mutex.RLock()
  111. defer m.mutex.RUnlock()
  112. task, exists := m.tasks[taskID]
  113. if !exists {
  114. return nil, errors.New("任务不存在")
  115. }
  116. // 返回任务的副本,避免外部修改
  117. return &Task{
  118. ID: task.ID,
  119. Type: task.Type,
  120. Source: task.Source,
  121. Status: task.Status,
  122. Progress: task.Progress,
  123. Result: task.Result,
  124. Error: task.Error,
  125. CreatedAt: task.CreatedAt,
  126. StartedAt: copyTime(task.StartedAt),
  127. CompletedAt: copyTime(task.CompletedAt),
  128. }, nil
  129. }
  130. // CancelTask 取消任务
  131. func (m *Manager) CancelTask(taskID string) error {
  132. m.mutex.Lock()
  133. defer m.mutex.Unlock()
  134. task, exists := m.tasks[taskID]
  135. if !exists {
  136. return errors.New("任务不存在")
  137. }
  138. // 只有等待中或运行中的任务可以被取消
  139. if task.Status != TaskPending && task.Status != TaskRunning {
  140. return errors.New("任务无法取消")
  141. }
  142. task.Status = TaskCancelled
  143. completedTime := time.Now()
  144. task.CompletedAt = &completedTime
  145. task.Error = "任务已被取消"
  146. return nil
  147. }
  148. // ListTasks 列出指定状态的任务
  149. func (m *Manager) ListTasks(status TaskStatus) []*Task {
  150. m.mutex.RLock()
  151. defer m.mutex.RUnlock()
  152. var result []*Task
  153. for _, task := range m.tasks {
  154. if status == "" || task.Status == status {
  155. // 返回副本
  156. result = append(result, &Task{
  157. ID: task.ID,
  158. Type: task.Type,
  159. Source: task.Source,
  160. Status: task.Status,
  161. Progress: task.Progress,
  162. Result: task.Result,
  163. Error: task.Error,
  164. CreatedAt: task.CreatedAt,
  165. StartedAt: copyTime(task.StartedAt),
  166. CompletedAt: copyTime(task.CompletedAt),
  167. })
  168. }
  169. }
  170. return result
  171. }
  172. // Shutdown 关闭任务管理器,等待所有任务完成
  173. func (m *Manager) Shutdown() {
  174. m.cancel()
  175. m.wg.Wait()
  176. }
  177. // 内部方法:更新任务进度
  178. func (m *Manager) updateTaskProgress(taskID string, progress int) {
  179. m.mutex.Lock()
  180. defer m.mutex.Unlock()
  181. if task, exists := m.tasks[taskID]; exists {
  182. task.Progress = progress
  183. }
  184. }
  185. // 内部方法:更新任务状态
  186. func (m *Manager) updateTaskStatus(taskID string, status TaskStatus, progress int, result interface{}, errMsg string) {
  187. m.mutex.Lock()
  188. defer m.mutex.Unlock()
  189. if task, exists := m.tasks[taskID]; exists {
  190. task.Status = status
  191. task.Progress = progress
  192. task.Result = result
  193. task.Error = errMsg
  194. }
  195. }
  196. // 辅助函数:复制时间指针
  197. func copyTime(t *time.Time) *time.Time {
  198. if t == nil {
  199. return nil
  200. }
  201. newTime := *t
  202. return &newTime
  203. }