2团
Published on 2024-08-16 / 22 Visits
0
0

通过限流器学习Golang context.WithTimeout合理用法

1. 前提

手头有个压测项目需要应用限流器,各压测节点需要向压测主节点批量申请Token以创建Tcp Client。限流器选择使用golang.org/x/time/rate(Uber提供的限流器性能更好,但是扩展性一般)。

压测子节点向主节点申请Token时,会设置超时时间,避免请求被长时间阻塞。

2. 应用

// 令牌桶容量
const BurstsNum = 5

// 每15ms发放5个token,这样可以批量获取token,避免slave http调用过于频繁
var limiter = rate.NewLimiter(rate.Every(15*time.Millisecond), BurstsNum)

func GetNToken(n int, duration time.Duration) bool {
	if n <= 0 || n > BurstsNum {
		logger.Errorf("token num must large than 0 and less than bursts num: %d", BurstsNum)
		return false
	}

    // 通过Context设置超时时间,超时时间为2秒
	timeoutCtx, cancel := context.WithTimeout(context.Background(), duration)
    // 必须显式调用cancel,避免限流器在超时时间内返回令牌,context中的相关资源未被合理释放
	defer cancel()
	if err := limiter.WaitN(timeoutCtx, n); err != nil {
		logger.Errorf("get n token failed, err: %s", err.Error())
		return false
	}
	return true
}

上述代码创建了一个令牌桶,每15ms往桶内放置5个令牌,消费令牌的时候,会传入携带超时设置的context,避免消费请求被长时间阻塞。

// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
	// 创建定时器生成函数
	newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
		timer := time.NewTimer(d)
		return timer.C, timer.Stop, func() {}
	}

	return lim.wait(ctx, n, time.Now(), newTimer)
}

// wait is the internal implementation of WaitN.
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
	lim.mu.Lock()
	burst := lim.burst
	limit := lim.limit
	lim.mu.Unlock()

    // 如果要消费的令牌数量大于令牌桶上限,且限流器的时间间隔不是无限,则无法正常消费令牌
	if n > burst && limit != Inf {
		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
	}
	// 检查当前传入的Context是否已经结束,如果已结束,直接返回超时取消信息
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}
	// 生产指定数量令牌的时间初始为无限
	waitLimit := InfDuration
    // 如果传入的Context设置了超时时间,则计算超时时间与当前时间的时间间隔,赋值给等待时间
	if deadline, ok := ctx.Deadline(); ok {
		waitLimit = deadline.Sub(t)
	}
	// 预估在等待时间内是否能够生产待消费的令牌,如果不能,则返回生产超时错误信息
    // 需要注意,如果能够在等待时间内生产足够的令牌,则待消费的令牌已经被预定,其他消费者只能排队消费后续生产的令牌
	r := lim.reserveN(t, n, waitLimit)
	if !r.ok {
		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
	}
	// 计算生产指定数量令牌所需的时间间隔
	delay := r.DelayFrom(t)
    // 如果当前的令牌满足要求,直接返回
	if delay == 0 {
		return nil
	}
    // 根据生产等待时间,创建指定的定时器
	ch, stop, advance := newTimer(delay)
    // 记得在函数退出前,销毁定时器,释放资源
	defer stop()
	advance() // only has an effect when testing
	select {
	case <-ch:
		// 等待时间已满,此时消费令牌成功,返回
		return nil
	case <-ctx.Done():
        // 如果Context因为各种原因被提前取消了,则迅速释放被预定消费的令牌,供其他消费者消费
		r.Cancel()
		return ctx.Err()
	}
}

通过上述代码注释,可以一窥context.WithTimeout的合理用法。


Comment