编程 用 Go 实现一个轻量级并发任务调度器(支持限速)

2025-05-07 12:48:52 +0800 CST views 408

用 Go 实现一个轻量级并发任务调度器(支持限速)

在学习 Go 的过程中,我尝试实现了一个轻量级的并发任务调度器,并支持速率限制、任务重试、优先级控制等功能。本文记录这一实现过程,也希望能为其他 Go 学习者提供参考。


🧭 为什么需要任务调度器

在微服务或分布式系统中,我们经常遇到以下问题:

  • 系统需要处理成千上万的异步任务(如 HTTP 请求、爬虫抓取等)
  • 控制并发量,避免下游服务压力过大
  • 遵守 API 配额,需要精确的速率控制
  • 需要任务重试、超时处理机制
  • 对任务执行结果进行收集和汇总

为了解决这些问题,我实现了一个功能完整、灵活可扩展的调度器,具备以下核心能力:

✅ 并发控制(Worker 池)
✅ 速率限制(令牌桶算法)
✅ 可配置重试机制
✅ 超时控制、优先级调度
✅ 可扩展监控与持久化


🔧 核心组件设计

Task 接口:任务抽象

type Task interface {
	ID() string
	Execute(ctx context.Context) (interface{}, error)
}

实现一个基础任务 SimpleTask

type SimpleTask struct {
	id     string
	action func() (interface{}, error)
}

func (t *SimpleTask) ID() string {
	return t.id
}

func (t *SimpleTask) Execute(ctx context.Context) (interface{}, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
		return t.action()
	}
}

Scheduler:调度器核心结构

type Scheduler struct {
	workerNum   int
	rateLimiter *rate.Limiter
	taskQueue   chan Task
	resultChan  chan *Result
	errorChan   chan *Error
	ctx         context.Context
	cancel      context.CancelFunc
	wg          sync.WaitGroup
}

结果与错误结构:

type Result struct {
	TaskID   string
	Output   interface{}
	Attempts int
}

type Error struct {
	TaskID   string
	Err      error
	Attempts int
}

启动与停止

func NewScheduler(workerNum, queueSize int) *Scheduler {
	ctx, cancel := context.WithCancel(context.Background())
	return &Scheduler{
		workerNum:  workerNum,
		taskQueue:  make(chan Task, queueSize),
		resultChan: make(chan *Result, queueSize),
		errorChan:  make(chan *Error, queueSize),
		ctx:        ctx,
		cancel:     cancel,
	}
}

func (s *Scheduler) SetRateLimit(perSecond int) {
	s.rateLimiter = rate.NewLimiter(rate.Limit(perSecond), perSecond)
}

func (s *Scheduler) Submit(task Task) {
	s.taskQueue <- task
}

func (s *Scheduler) Start() {
	for i := 0; i < s.workerNum; i++ {
		s.wg.Add(1)
		go s.worker()
	}
}

func (s *Scheduler) Stop() {
	s.cancel()
	s.wg.Wait()
	close(s.taskQueue)
	close(s.resultChan)
	close(s.errorChan)
}

Worker 协程实现

func (s *Scheduler) worker() {
	defer s.wg.Done()

	for {
		select {
		case <-s.ctx.Done():
			return
		case task := <-s.taskQueue:
			if s.rateLimiter != nil {
				if err := s.rateLimiter.Wait(s.ctx); err != nil {
					s.errorChan <- &Error{TaskID: task.ID(), Err: err}
					continue
				}
			}

			output, err := task.Execute(s.ctx)
			if err != nil {
				s.errorChan <- &Error{TaskID: task.ID(), Err: err}
			} else {
				s.resultChan <- &Result{TaskID: task.ID(), Output: output}
			}
		}
	}
}

⚙️ 高级功能扩展

任务重试

type WithRetry struct {
	task    Task
	max     int
	backoff time.Duration
}

func (r *WithRetry) ID() string { return r.task.ID() }

func (r *WithRetry) Execute(ctx context.Context) (interface{}, error) {
	var lastErr error
	for i := 0; i < r.max; i++ {
		if i > 0 {
			select {
			case <-time.After(r.backoff):
			case <-ctx.Done():
				return nil, ctx.Err()
			}
		}
		output, err := r.task.Execute(ctx)
		if err == nil {
			return output, nil
		}
		lastErr = err
	}
	return nil, fmt.Errorf("after %d attempts: %w", r.max, lastErr)
}

任务超时控制

type WithTimeout struct {
	task    Task
	timeout time.Duration
}

func (t *WithTimeout) ID() string { return t.task.ID() }

func (t *WithTimeout) Execute(ctx context.Context) (interface{}, error) {
	ctx, cancel := context.WithTimeout(ctx, t.timeout)
	defer cancel()
	return t.task.Execute(ctx)
}

🚀 使用示例

func main() {
	s := NewScheduler(3, 100)
	s.SetRateLimit(5)
	s.Start()

	// 收集结果
	go func() {
		for r := range s.resultChan {
			fmt.Printf("[SUCCESS] Task %s -> %v\n", r.TaskID, r.Output)
		}
	}()
	// 收集错误
	go func() {
		for e := range s.errorChan {
			fmt.Printf("[ERROR] Task %s -> %v\n", e.TaskID, e.Err)
		}
	}()

	for i := 0; i < 20; i++ {
		taskID := fmt.Sprintf("task-%d", i)
		task := &SimpleTask{
			id: taskID,
			action: func() (interface{}, error) {
				time.Sleep(100 * time.Millisecond)
				if rand.Intn(10) == 0 {
					return nil, fmt.Errorf("random error")
				}
				return fmt.Sprintf("result of %s", taskID), nil
			},
		}
		retry := &WithRetry{task: task, max: 3, backoff: 200 * time.Millisecond}
		s.Submit(retry)
	}

	time.Sleep(5 * time.Second)
	s.Stop()
}

📈 性能优化建议

  • 动态调整 worker 数量
  • 合并小任务为批处理任务
  • 使用任务亲和性减少上下文切换
  • 接入 Prometheus 实现实时监控
  • 使用 Kafka/Redis 实现任务持久化
  • 使用 etcd 实现分布式任务调度
  • 实现优雅停机处理(SIGTERM 捕获)

✅ 总结

本文实现了一个 Go 编写的轻量级任务调度器,具备高并发控制、速率限制、错误重试等能力,并通过组合模式支持了各种任务扩展。

适用场景包括:

  • HTTP 接口调用调度
  • 后台数据处理任务
  • 爬虫任务调度系统

未来你还可以将其升级为分布式任务调度器,或者对接可视化面板用于企业级任务管理。

推荐文章

Nginx 如何防止 DDoS 攻击
2024-11-18 21:51:48 +0800 CST
Vue3中如何进行错误处理?
2024-11-18 05:17:47 +0800 CST
介绍Vue3的静态提升是什么?
2024-11-18 10:25:10 +0800 CST
api接口怎么对接
2024-11-19 09:42:47 +0800 CST
避免 Go 语言中的接口污染
2024-11-19 05:20:53 +0800 CST
Vue3中如何处理路由和导航?
2024-11-18 16:56:14 +0800 CST
Nginx 反向代理
2024-11-19 08:02:10 +0800 CST
Gin 与 Layui 分页 HTML 生成工具
2024-11-19 09:20:21 +0800 CST
Vue3 中提供了哪些新的指令
2024-11-19 01:48:20 +0800 CST
Java环境中使用Elasticsearch
2024-11-18 22:46:32 +0800 CST
乐观锁和悲观锁,如何区分?
2024-11-19 09:36:53 +0800 CST
实现微信回调多域名的方法
2024-11-18 09:45:18 +0800 CST
服务器购买推荐
2024-11-18 23:48:02 +0800 CST
底部导航栏
2024-11-19 01:12:32 +0800 CST
windon安装beego框架记录
2024-11-19 09:55:33 +0800 CST
mendeley2 一个Python管理文献的库
2024-11-19 02:56:20 +0800 CST
程序员茄子在线接单