package task import ( "context" "encoding/hex" "errors" "fmt" "math/rand" "sync" "time" ) // Manager 任务管理器,负责任务的创建、执行和状态跟踪 type Manager struct { tasks map[string]*Task // 任务ID到任务的映射 mutex sync.RWMutex // 保护tasks的互斥锁 workerPool chan struct{} // 工作池,控制并发数量 wg sync.WaitGroup // 等待所有任务完成 ctx context.Context // 上下文,用于取消所有任务 cancel context.CancelFunc // 取消函数 } // NewManager 创建新的任务管理器 // maxConcurrent 最大并发任务数 func NewManager(ctx context.Context, maxConcurrent int) *Manager { ctx, cancel := context.WithCancel(ctx) return &Manager{ tasks: make(map[string]*Task), workerPool: make(chan struct{}, maxConcurrent), ctx: ctx, cancel: cancel, } } // GenerateTaskID 生成唯一任务ID func (m *Manager) GenerateTaskID() string { bytes := make([]byte, 8) _, err := rand.Read(bytes) if err != nil { // 作为 fallback,使用时间戳+随机数 return fmt.Sprintf("%d%04d", time.Now().UnixNano(), rand.Intn(10000)) } return hex.EncodeToString(bytes) } // CreateTask 创建新任务但不执行 func (m *Manager) CreateTask(taskType string, opts ...TaskOption) *Task { taskID := m.GenerateTaskID() now := time.Now() task := &Task{ ID: taskID, Type: taskType, Status: TaskPending, Progress: 0, CreatedAt: now, } // 应用选项 for _, opt := range opts { opt(task) } m.mutex.Lock() m.tasks[taskID] = task m.mutex.Unlock() return task } // ExecuteTask 执行任务 func (m *Manager) ExecuteTask(task *Task, taskFunc TaskFunc) { m.wg.Add(1) go func() { defer m.wg.Done() // 等待工作池有空位或上下文被取消 select { case m.workerPool <- struct{}{}: defer func() { <-m.workerPool }() case <-m.ctx.Done(): m.updateTaskStatus(task.ID, TaskCancelled, 0, nil, m.ctx.Err().Error()) return } // 更新任务为运行中状态 startTime := time.Now() m.mutex.Lock() task.Status = TaskRunning task.StartedAt = &startTime m.mutex.Unlock() // 执行任务函数 result, err := taskFunc(m.ctx, func(progress int) { // 确保进度在0-100之间 if progress < 0 { progress = 0 } else if progress > 100 { progress = 100 } m.updateTaskProgress(task.ID, progress) }) // 更新任务完成状态 completedTime := time.Now() if err != nil { m.updateTaskStatus(task.ID, TaskFailed, 100, nil, err.Error()) } else { m.updateTaskStatus(task.ID, TaskSuccess, 100, result, "") } m.mutex.Lock() task.CompletedAt = &completedTime m.mutex.Unlock() }() } // SubmitTask 创建并执行任务 func (m *Manager) SubmitTask(taskType string, taskFunc TaskFunc, opts ...TaskOption) *Task { task := m.CreateTask(taskType, opts...) m.ExecuteTask(task, taskFunc) return task } // GetTaskStatus 获取任务状态 func (m *Manager) GetTaskStatus(taskID string) (*Task, error) { m.mutex.RLock() defer m.mutex.RUnlock() task, exists := m.tasks[taskID] if !exists { return nil, errors.New("任务不存在") } // 返回任务的副本,避免外部修改 return &Task{ ID: task.ID, Type: task.Type, Source: task.Source, Status: task.Status, Progress: task.Progress, Result: task.Result, Error: task.Error, CreatedAt: task.CreatedAt, StartedAt: copyTime(task.StartedAt), CompletedAt: copyTime(task.CompletedAt), }, nil } // CancelTask 取消任务 func (m *Manager) CancelTask(taskID string) error { m.mutex.Lock() defer m.mutex.Unlock() task, exists := m.tasks[taskID] if !exists { return errors.New("任务不存在") } // 只有等待中或运行中的任务可以被取消 if task.Status != TaskPending && task.Status != TaskRunning { return errors.New("任务无法取消") } task.Status = TaskCancelled completedTime := time.Now() task.CompletedAt = &completedTime task.Error = "任务已被取消" return nil } // ListTasks 列出指定状态的任务 func (m *Manager) ListTasks(status TaskStatus) []*Task { m.mutex.RLock() defer m.mutex.RUnlock() var result []*Task for _, task := range m.tasks { if status == "" || task.Status == status { // 返回副本 result = append(result, &Task{ ID: task.ID, Type: task.Type, Source: task.Source, Status: task.Status, Progress: task.Progress, Result: task.Result, Error: task.Error, CreatedAt: task.CreatedAt, StartedAt: copyTime(task.StartedAt), CompletedAt: copyTime(task.CompletedAt), }) } } return result } // Shutdown 关闭任务管理器,等待所有任务完成 func (m *Manager) Shutdown() { m.cancel() m.wg.Wait() } // 内部方法:更新任务进度 func (m *Manager) updateTaskProgress(taskID string, progress int) { m.mutex.Lock() defer m.mutex.Unlock() if task, exists := m.tasks[taskID]; exists { task.Progress = progress } } // 内部方法:更新任务状态 func (m *Manager) updateTaskStatus(taskID string, status TaskStatus, progress int, result interface{}, errMsg string) { m.mutex.Lock() defer m.mutex.Unlock() if task, exists := m.tasks[taskID]; exists { task.Status = status task.Progress = progress task.Result = result task.Error = errMsg } } // 辅助函数:复制时间指针 func copyTime(t *time.Time) *time.Time { if t == nil { return nil } newTime := *t return &newTime }