1. 前提
手头有个压测项目需要应用限流器,各压测节点需要向压测主节点批量申请Token以创建Tcp Client。限流器选择使用golang.org/x/time/rate
(Uber提供的限流器性能更好,但是扩展性一般)。
压测子节点向主节点申请Token时,会设置超时时间,避免请求被长时间阻塞。
2. 应用
// 令牌桶容量
const BurstsNum = 5
// 每15ms发放5个token,这样可以批量获取token,避免slave http调用过于频繁
var limiter = rate.NewLimiter(rate.Every(15*time.Millisecond), BurstsNum)
func GetNToken(n int, duration time.Duration) bool {
if n <= 0 || n > BurstsNum {
logger.Errorf("token num must large than 0 and less than bursts num: %d", BurstsNum)
return false
}
// 通过Context设置超时时间,超时时间为2秒
timeoutCtx, cancel := context.WithTimeout(context.Background(), duration)
// 必须显式调用cancel,避免限流器在超时时间内返回令牌,context中的相关资源未被合理释放
defer cancel()
if err := limiter.WaitN(timeoutCtx, n); err != nil {
logger.Errorf("get n token failed, err: %s", err.Error())
return false
}
return true
}
上述代码创建了一个令牌桶,每15ms往桶内放置5个令牌,消费令牌的时候,会传入携带超时设置的context,避免消费请求被长时间阻塞。
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
// 创建定时器生成函数
newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
timer := time.NewTimer(d)
return timer.C, timer.Stop, func() {}
}
return lim.wait(ctx, n, time.Now(), newTimer)
}
// wait is the internal implementation of WaitN.
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
// 如果要消费的令牌数量大于令牌桶上限,且限流器的时间间隔不是无限,则无法正常消费令牌
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}
// 检查当前传入的Context是否已经结束,如果已结束,直接返回超时取消信息
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 生产指定数量令牌的时间初始为无限
waitLimit := InfDuration
// 如果传入的Context设置了超时时间,则计算超时时间与当前时间的时间间隔,赋值给等待时间
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(t)
}
// 预估在等待时间内是否能够生产待消费的令牌,如果不能,则返回生产超时错误信息
// 需要注意,如果能够在等待时间内生产足够的令牌,则待消费的令牌已经被预定,其他消费者只能排队消费后续生产的令牌
r := lim.reserveN(t, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// 计算生产指定数量令牌所需的时间间隔
delay := r.DelayFrom(t)
// 如果当前的令牌满足要求,直接返回
if delay == 0 {
return nil
}
// 根据生产等待时间,创建指定的定时器
ch, stop, advance := newTimer(delay)
// 记得在函数退出前,销毁定时器,释放资源
defer stop()
advance() // only has an effect when testing
select {
case <-ch:
// 等待时间已满,此时消费令牌成功,返回
return nil
case <-ctx.Done():
// 如果Context因为各种原因被提前取消了,则迅速释放被预定消费的令牌,供其他消费者消费
r.Cancel()
return ctx.Err()
}
}
通过上述代码注释,可以一窥context.WithTimeout的合理用法。