编程 用 Go 实现一个轻量级并发任务调度器(支持限速)

2025-05-07 12:48:52 +0800 CST views 236

用 Go 实现一个轻量级并发任务调度器(支持限速)

在学习 Go 的过程中,我尝试实现了一个轻量级的并发任务调度器,并支持速率限制、任务重试、优先级控制等功能。本文记录这一实现过程,也希望能为其他 Go 学习者提供参考。


🧭 为什么需要任务调度器

在微服务或分布式系统中,我们经常遇到以下问题:

  • 系统需要处理成千上万的异步任务(如 HTTP 请求、爬虫抓取等)
  • 控制并发量,避免下游服务压力过大
  • 遵守 API 配额,需要精确的速率控制
  • 需要任务重试、超时处理机制
  • 对任务执行结果进行收集和汇总

为了解决这些问题,我实现了一个功能完整、灵活可扩展的调度器,具备以下核心能力:

✅ 并发控制(Worker 池)
✅ 速率限制(令牌桶算法)
✅ 可配置重试机制
✅ 超时控制、优先级调度
✅ 可扩展监控与持久化


🔧 核心组件设计

Task 接口:任务抽象

type Task interface {
	ID() string
	Execute(ctx context.Context) (interface{}, error)
}

实现一个基础任务 SimpleTask

type SimpleTask struct {
	id     string
	action func() (interface{}, error)
}

func (t *SimpleTask) ID() string {
	return t.id
}

func (t *SimpleTask) Execute(ctx context.Context) (interface{}, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
		return t.action()
	}
}

Scheduler:调度器核心结构

type Scheduler struct {
	workerNum   int
	rateLimiter *rate.Limiter
	taskQueue   chan Task
	resultChan  chan *Result
	errorChan   chan *Error
	ctx         context.Context
	cancel      context.CancelFunc
	wg          sync.WaitGroup
}

结果与错误结构:

type Result struct {
	TaskID   string
	Output   interface{}
	Attempts int
}

type Error struct {
	TaskID   string
	Err      error
	Attempts int
}

启动与停止

func NewScheduler(workerNum, queueSize int) *Scheduler {
	ctx, cancel := context.WithCancel(context.Background())
	return &Scheduler{
		workerNum:  workerNum,
		taskQueue:  make(chan Task, queueSize),
		resultChan: make(chan *Result, queueSize),
		errorChan:  make(chan *Error, queueSize),
		ctx:        ctx,
		cancel:     cancel,
	}
}

func (s *Scheduler) SetRateLimit(perSecond int) {
	s.rateLimiter = rate.NewLimiter(rate.Limit(perSecond), perSecond)
}

func (s *Scheduler) Submit(task Task) {
	s.taskQueue <- task
}

func (s *Scheduler) Start() {
	for i := 0; i < s.workerNum; i++ {
		s.wg.Add(1)
		go s.worker()
	}
}

func (s *Scheduler) Stop() {
	s.cancel()
	s.wg.Wait()
	close(s.taskQueue)
	close(s.resultChan)
	close(s.errorChan)
}

Worker 协程实现

func (s *Scheduler) worker() {
	defer s.wg.Done()

	for {
		select {
		case <-s.ctx.Done():
			return
		case task := <-s.taskQueue:
			if s.rateLimiter != nil {
				if err := s.rateLimiter.Wait(s.ctx); err != nil {
					s.errorChan <- &Error{TaskID: task.ID(), Err: err}
					continue
				}
			}

			output, err := task.Execute(s.ctx)
			if err != nil {
				s.errorChan <- &Error{TaskID: task.ID(), Err: err}
			} else {
				s.resultChan <- &Result{TaskID: task.ID(), Output: output}
			}
		}
	}
}

⚙️ 高级功能扩展

任务重试

type WithRetry struct {
	task    Task
	max     int
	backoff time.Duration
}

func (r *WithRetry) ID() string { return r.task.ID() }

func (r *WithRetry) Execute(ctx context.Context) (interface{}, error) {
	var lastErr error
	for i := 0; i < r.max; i++ {
		if i > 0 {
			select {
			case <-time.After(r.backoff):
			case <-ctx.Done():
				return nil, ctx.Err()
			}
		}
		output, err := r.task.Execute(ctx)
		if err == nil {
			return output, nil
		}
		lastErr = err
	}
	return nil, fmt.Errorf("after %d attempts: %w", r.max, lastErr)
}

任务超时控制

type WithTimeout struct {
	task    Task
	timeout time.Duration
}

func (t *WithTimeout) ID() string { return t.task.ID() }

func (t *WithTimeout) Execute(ctx context.Context) (interface{}, error) {
	ctx, cancel := context.WithTimeout(ctx, t.timeout)
	defer cancel()
	return t.task.Execute(ctx)
}

🚀 使用示例

func main() {
	s := NewScheduler(3, 100)
	s.SetRateLimit(5)
	s.Start()

	// 收集结果
	go func() {
		for r := range s.resultChan {
			fmt.Printf("[SUCCESS] Task %s -> %v\n", r.TaskID, r.Output)
		}
	}()
	// 收集错误
	go func() {
		for e := range s.errorChan {
			fmt.Printf("[ERROR] Task %s -> %v\n", e.TaskID, e.Err)
		}
	}()

	for i := 0; i < 20; i++ {
		taskID := fmt.Sprintf("task-%d", i)
		task := &SimpleTask{
			id: taskID,
			action: func() (interface{}, error) {
				time.Sleep(100 * time.Millisecond)
				if rand.Intn(10) == 0 {
					return nil, fmt.Errorf("random error")
				}
				return fmt.Sprintf("result of %s", taskID), nil
			},
		}
		retry := &WithRetry{task: task, max: 3, backoff: 200 * time.Millisecond}
		s.Submit(retry)
	}

	time.Sleep(5 * time.Second)
	s.Stop()
}

📈 性能优化建议

  • 动态调整 worker 数量
  • 合并小任务为批处理任务
  • 使用任务亲和性减少上下文切换
  • 接入 Prometheus 实现实时监控
  • 使用 Kafka/Redis 实现任务持久化
  • 使用 etcd 实现分布式任务调度
  • 实现优雅停机处理(SIGTERM 捕获)

✅ 总结

本文实现了一个 Go 编写的轻量级任务调度器,具备高并发控制、速率限制、错误重试等能力,并通过组合模式支持了各种任务扩展。

适用场景包括:

  • HTTP 接口调用调度
  • 后台数据处理任务
  • 爬虫任务调度系统

未来你还可以将其升级为分布式任务调度器,或者对接可视化面板用于企业级任务管理。

推荐文章

html流光登陆页面
2024-11-18 15:36:18 +0800 CST
乐观锁和悲观锁,如何区分?
2024-11-19 09:36:53 +0800 CST
pin.gl是基于WebRTC的屏幕共享工具
2024-11-19 06:38:05 +0800 CST
Nginx 如何防止 DDoS 攻击
2024-11-18 21:51:48 +0800 CST
Python上下文管理器:with语句
2024-11-19 06:25:31 +0800 CST
使用Python实现邮件自动化
2024-11-18 20:18:14 +0800 CST
PHP中获取某个月份的天数
2024-11-18 11:28:47 +0800 CST
如何在Vue3中定义一个组件?
2024-11-17 04:15:09 +0800 CST
使用 sync.Pool 优化 Go 程序性能
2024-11-19 05:56:51 +0800 CST
平面设计常用尺寸
2024-11-19 02:20:22 +0800 CST
在 Rust 生产项目中存储数据
2024-11-19 02:35:11 +0800 CST
动态渐变背景
2024-11-19 01:49:50 +0800 CST
一个有趣的进度条
2024-11-19 09:56:04 +0800 CST
利用图片实现网站的加载速度
2024-11-18 12:29:31 +0800 CST
从Go开发者的视角看Rust
2024-11-18 11:49:49 +0800 CST
开源AI反混淆JS代码:HumanifyJS
2024-11-19 02:30:40 +0800 CST
虚拟DOM渲染器的内部机制
2024-11-19 06:49:23 +0800 CST
程序员茄子在线接单