| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- package task
- import (
- "context"
- "encoding/hex"
- "errors"
- "fmt"
- "math/rand"
- "sync"
- "time"
- )
- // Manager 任务管理器,负责任务的创建、执行和状态跟踪
- type Manager struct {
- tasks map[string]*Task // 任务ID到任务的映射
- mutex sync.RWMutex // 保护tasks的互斥锁
- workerPool chan struct{} // 工作池,控制并发数量
- wg sync.WaitGroup // 等待所有任务完成
- ctx context.Context // 上下文,用于取消所有任务
- cancel context.CancelFunc // 取消函数
- }
- // NewManager 创建新的任务管理器
- // maxConcurrent 最大并发任务数
- func NewManager(ctx context.Context, maxConcurrent int) *Manager {
- ctx, cancel := context.WithCancel(ctx)
- return &Manager{
- tasks: make(map[string]*Task),
- workerPool: make(chan struct{}, maxConcurrent),
- ctx: ctx,
- cancel: cancel,
- }
- }
- // GenerateTaskID 生成唯一任务ID
- func (m *Manager) GenerateTaskID() string {
- bytes := make([]byte, 8)
- _, err := rand.Read(bytes)
- if err != nil {
- // 作为 fallback,使用时间戳+随机数
- return fmt.Sprintf("%d%04d", time.Now().UnixNano(), rand.Intn(10000))
- }
- return hex.EncodeToString(bytes)
- }
- // CreateTask 创建新任务但不执行
- func (m *Manager) CreateTask(taskType string, opts ...TaskOption) *Task {
- taskID := m.GenerateTaskID()
- now := time.Now()
- task := &Task{
- ID: taskID,
- Type: taskType,
- Status: TaskPending,
- Progress: 0,
- CreatedAt: now,
- }
- // 应用选项
- for _, opt := range opts {
- opt(task)
- }
- m.mutex.Lock()
- m.tasks[taskID] = task
- m.mutex.Unlock()
- return task
- }
- // ExecuteTask 执行任务
- func (m *Manager) ExecuteTask(task *Task, taskFunc TaskFunc) {
- m.wg.Add(1)
- go func() {
- defer m.wg.Done()
- // 等待工作池有空位或上下文被取消
- select {
- case m.workerPool <- struct{}{}:
- defer func() { <-m.workerPool }()
- case <-m.ctx.Done():
- m.updateTaskStatus(task.ID, TaskCancelled, 0, nil, m.ctx.Err().Error())
- return
- }
- // 更新任务为运行中状态
- startTime := time.Now()
- m.mutex.Lock()
- task.Status = TaskRunning
- task.StartedAt = &startTime
- m.mutex.Unlock()
- // 执行任务函数
- result, err := taskFunc(m.ctx, func(progress int) {
- // 确保进度在0-100之间
- if progress < 0 {
- progress = 0
- } else if progress > 100 {
- progress = 100
- }
- m.updateTaskProgress(task.ID, progress)
- })
- // 更新任务完成状态
- completedTime := time.Now()
- if err != nil {
- m.updateTaskStatus(task.ID, TaskFailed, 100, nil, err.Error())
- } else {
- m.updateTaskStatus(task.ID, TaskSuccess, 100, result, "")
- }
- m.mutex.Lock()
- task.CompletedAt = &completedTime
- m.mutex.Unlock()
- }()
- }
- // SubmitTask 创建并执行任务
- func (m *Manager) SubmitTask(taskType string, taskFunc TaskFunc, opts ...TaskOption) *Task {
- task := m.CreateTask(taskType, opts...)
- m.ExecuteTask(task, taskFunc)
- return task
- }
- // GetTaskStatus 获取任务状态
- func (m *Manager) GetTaskStatus(taskID string) (*Task, error) {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- task, exists := m.tasks[taskID]
- if !exists {
- return nil, errors.New("任务不存在")
- }
- // 返回任务的副本,避免外部修改
- return &Task{
- ID: task.ID,
- Type: task.Type,
- Source: task.Source,
- Status: task.Status,
- Progress: task.Progress,
- Result: task.Result,
- Error: task.Error,
- CreatedAt: task.CreatedAt,
- StartedAt: copyTime(task.StartedAt),
- CompletedAt: copyTime(task.CompletedAt),
- }, nil
- }
- // CancelTask 取消任务
- func (m *Manager) CancelTask(taskID string) error {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- task, exists := m.tasks[taskID]
- if !exists {
- return errors.New("任务不存在")
- }
- // 只有等待中或运行中的任务可以被取消
- if task.Status != TaskPending && task.Status != TaskRunning {
- return errors.New("任务无法取消")
- }
- task.Status = TaskCancelled
- completedTime := time.Now()
- task.CompletedAt = &completedTime
- task.Error = "任务已被取消"
- return nil
- }
- // ListTasks 列出指定状态的任务
- func (m *Manager) ListTasks(status TaskStatus) []*Task {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- var result []*Task
- for _, task := range m.tasks {
- if status == "" || task.Status == status {
- // 返回副本
- result = append(result, &Task{
- ID: task.ID,
- Type: task.Type,
- Source: task.Source,
- Status: task.Status,
- Progress: task.Progress,
- Result: task.Result,
- Error: task.Error,
- CreatedAt: task.CreatedAt,
- StartedAt: copyTime(task.StartedAt),
- CompletedAt: copyTime(task.CompletedAt),
- })
- }
- }
- return result
- }
- // Shutdown 关闭任务管理器,等待所有任务完成
- func (m *Manager) Shutdown() {
- m.cancel()
- m.wg.Wait()
- }
- // 内部方法:更新任务进度
- func (m *Manager) updateTaskProgress(taskID string, progress int) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- if task, exists := m.tasks[taskID]; exists {
- task.Progress = progress
- }
- }
- // 内部方法:更新任务状态
- func (m *Manager) updateTaskStatus(taskID string, status TaskStatus, progress int, result interface{}, errMsg string) {
- m.mutex.Lock()
- defer m.mutex.Unlock()
- if task, exists := m.tasks[taskID]; exists {
- task.Status = status
- task.Progress = progress
- task.Result = result
- task.Error = errMsg
- }
- }
- // 辅助函数:复制时间指针
- func copyTime(t *time.Time) *time.Time {
- if t == nil {
- return nil
- }
- newTime := *t
- return &newTime
- }
|