编程 用 Go 实现一个轻量级并发任务调度器(支持限速)

2025-05-07 12:48:52 +0800 CST views 867

用 Go 实现一个轻量级并发任务调度器(支持限速)

在学习 Go 的过程中,我尝试实现了一个轻量级的并发任务调度器,并支持速率限制、任务重试、优先级控制等功能。本文记录这一实现过程,也希望能为其他 Go 学习者提供参考。


🧭 为什么需要任务调度器

在微服务或分布式系统中,我们经常遇到以下问题:

  • 系统需要处理成千上万的异步任务(如 HTTP 请求、爬虫抓取等)
  • 控制并发量,避免下游服务压力过大
  • 遵守 API 配额,需要精确的速率控制
  • 需要任务重试、超时处理机制
  • 对任务执行结果进行收集和汇总

为了解决这些问题,我实现了一个功能完整、灵活可扩展的调度器,具备以下核心能力:

✅ 并发控制(Worker 池)
✅ 速率限制(令牌桶算法)
✅ 可配置重试机制
✅ 超时控制、优先级调度
✅ 可扩展监控与持久化


🔧 核心组件设计

Task 接口:任务抽象

type Task interface {
	ID() string
	Execute(ctx context.Context) (interface{}, error)
}

实现一个基础任务 SimpleTask

type SimpleTask struct {
	id     string
	action func() (interface{}, error)
}

func (t *SimpleTask) ID() string {
	return t.id
}

func (t *SimpleTask) Execute(ctx context.Context) (interface{}, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
		return t.action()
	}
}

Scheduler:调度器核心结构

type Scheduler struct {
	workerNum   int
	rateLimiter *rate.Limiter
	taskQueue   chan Task
	resultChan  chan *Result
	errorChan   chan *Error
	ctx         context.Context
	cancel      context.CancelFunc
	wg          sync.WaitGroup
}

结果与错误结构:

type Result struct {
	TaskID   string
	Output   interface{}
	Attempts int
}

type Error struct {
	TaskID   string
	Err      error
	Attempts int
}

启动与停止

func NewScheduler(workerNum, queueSize int) *Scheduler {
	ctx, cancel := context.WithCancel(context.Background())
	return &Scheduler{
		workerNum:  workerNum,
		taskQueue:  make(chan Task, queueSize),
		resultChan: make(chan *Result, queueSize),
		errorChan:  make(chan *Error, queueSize),
		ctx:        ctx,
		cancel:     cancel,
	}
}

func (s *Scheduler) SetRateLimit(perSecond int) {
	s.rateLimiter = rate.NewLimiter(rate.Limit(perSecond), perSecond)
}

func (s *Scheduler) Submit(task Task) {
	s.taskQueue <- task
}

func (s *Scheduler) Start() {
	for i := 0; i < s.workerNum; i++ {
		s.wg.Add(1)
		go s.worker()
	}
}

func (s *Scheduler) Stop() {
	s.cancel()
	s.wg.Wait()
	close(s.taskQueue)
	close(s.resultChan)
	close(s.errorChan)
}

Worker 协程实现

func (s *Scheduler) worker() {
	defer s.wg.Done()

	for {
		select {
		case <-s.ctx.Done():
			return
		case task := <-s.taskQueue:
			if s.rateLimiter != nil {
				if err := s.rateLimiter.Wait(s.ctx); err != nil {
					s.errorChan <- &Error{TaskID: task.ID(), Err: err}
					continue
				}
			}

			output, err := task.Execute(s.ctx)
			if err != nil {
				s.errorChan <- &Error{TaskID: task.ID(), Err: err}
			} else {
				s.resultChan <- &Result{TaskID: task.ID(), Output: output}
			}
		}
	}
}

⚙️ 高级功能扩展

任务重试

type WithRetry struct {
	task    Task
	max     int
	backoff time.Duration
}

func (r *WithRetry) ID() string { return r.task.ID() }

func (r *WithRetry) Execute(ctx context.Context) (interface{}, error) {
	var lastErr error
	for i := 0; i < r.max; i++ {
		if i > 0 {
			select {
			case <-time.After(r.backoff):
			case <-ctx.Done():
				return nil, ctx.Err()
			}
		}
		output, err := r.task.Execute(ctx)
		if err == nil {
			return output, nil
		}
		lastErr = err
	}
	return nil, fmt.Errorf("after %d attempts: %w", r.max, lastErr)
}

任务超时控制

type WithTimeout struct {
	task    Task
	timeout time.Duration
}

func (t *WithTimeout) ID() string { return t.task.ID() }

func (t *WithTimeout) Execute(ctx context.Context) (interface{}, error) {
	ctx, cancel := context.WithTimeout(ctx, t.timeout)
	defer cancel()
	return t.task.Execute(ctx)
}

🚀 使用示例

func main() {
	s := NewScheduler(3, 100)
	s.SetRateLimit(5)
	s.Start()

	// 收集结果
	go func() {
		for r := range s.resultChan {
			fmt.Printf("[SUCCESS] Task %s -> %v\n", r.TaskID, r.Output)
		}
	}()
	// 收集错误
	go func() {
		for e := range s.errorChan {
			fmt.Printf("[ERROR] Task %s -> %v\n", e.TaskID, e.Err)
		}
	}()

	for i := 0; i < 20; i++ {
		taskID := fmt.Sprintf("task-%d", i)
		task := &SimpleTask{
			id: taskID,
			action: func() (interface{}, error) {
				time.Sleep(100 * time.Millisecond)
				if rand.Intn(10) == 0 {
					return nil, fmt.Errorf("random error")
				}
				return fmt.Sprintf("result of %s", taskID), nil
			},
		}
		retry := &WithRetry{task: task, max: 3, backoff: 200 * time.Millisecond}
		s.Submit(retry)
	}

	time.Sleep(5 * time.Second)
	s.Stop()
}

📈 性能优化建议

  • 动态调整 worker 数量
  • 合并小任务为批处理任务
  • 使用任务亲和性减少上下文切换
  • 接入 Prometheus 实现实时监控
  • 使用 Kafka/Redis 实现任务持久化
  • 使用 etcd 实现分布式任务调度
  • 实现优雅停机处理(SIGTERM 捕获)

✅ 总结

本文实现了一个 Go 编写的轻量级任务调度器,具备高并发控制、速率限制、错误重试等能力,并通过组合模式支持了各种任务扩展。

适用场景包括:

  • HTTP 接口调用调度
  • 后台数据处理任务
  • 爬虫任务调度系统

未来你还可以将其升级为分布式任务调度器,或者对接可视化面板用于企业级任务管理。

推荐文章

Go配置镜像源代理
2024-11-19 09:10:35 +0800 CST
liunx服务器监控workerman进程守护
2024-11-18 13:28:44 +0800 CST
纯CSS绘制iPhoneX的外观
2024-11-19 06:39:43 +0800 CST
PostgreSQL日常运维命令总结分享
2024-11-18 06:58:22 +0800 CST
pin.gl是基于WebRTC的屏幕共享工具
2024-11-19 06:38:05 +0800 CST
如何在 Vue 3 中使用 TypeScript?
2024-11-18 22:30:18 +0800 CST
Nginx负载均衡详解
2024-11-17 07:43:48 +0800 CST
Vue3中的Scoped Slots有什么改变?
2024-11-17 13:50:01 +0800 CST
Vue3中如何实现插件?
2024-11-18 04:27:04 +0800 CST
Web 端 Office 文件预览工具库
2024-11-18 22:19:16 +0800 CST
php curl并发代码
2024-11-18 01:45:03 +0800 CST
Nginx 负载均衡
2024-11-19 10:03:14 +0800 CST
curl错误代码表
2024-11-17 09:34:46 +0800 CST
使用Python实现邮件自动化
2024-11-18 20:18:14 +0800 CST
php常用的正则表达式
2024-11-19 03:48:35 +0800 CST
Roop是一款免费开源的AI换脸工具
2024-11-19 08:31:01 +0800 CST
随机分数html
2025-01-25 10:56:34 +0800 CST
初学者的 Rust Web 开发指南
2024-11-18 10:51:35 +0800 CST
Vue3中如何处理路由和导航?
2024-11-18 16:56:14 +0800 CST
Vue3中如何扩展VNode?
2024-11-17 19:33:18 +0800 CST
Vue3中如何处理权限控制?
2024-11-18 05:36:30 +0800 CST
程序员茄子在线接单