在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。

  • 缓存,提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹
  • 降级,当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开
  • 限流,通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。

一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

限流算法

常见的限流算法有:计数器、滑动窗口、漏桶、令牌桶。

计数器算法

采用计数器实现限流有点简单粗暴,一般我们会限制一秒钟的能够通过的请求数,比如限流qps为100,算法的实现思路就是从第一个请求进来开始计时,在接下去的1s内,每来一个请求,就把计数加1,如果累加的数字达到了100,那么后续的请求就会被全部拒绝。等到1s结束后,把计数恢复成0,重新开始计数。

弊端

  • “突刺现象”,如果我在单位时间1s内的前10ms,已经通过了100个请求,那后面的990ms,只能眼巴巴的把请求拒绝。
  • “时间临界点”缺陷,例如规定的1分钟最多100个请求,也就是每秒钟最多1.7个请求,在00:00:59时,瞬间发送了100个请求,并且00:01:00又瞬间发送了100个请求,那么其实这个用户在1秒里面,瞬间发送了200个请求。系统可能会承受恶意用户的大量请求,甚至击穿系统。

示例代码

示例一:

package main

import (
	"fmt"
	"sync"
	"time"
)

type LimitRate struct {
	rate  int           //计数周期内最多允许的请求数
	begin time.Time     //计数开始时间
	cycle time.Duration //计数周期
	count int           //计数周期内累计收到的请求数
	lock  sync.Mutex
}

func (l *LimitRate) Allow() bool {
	l.lock.Lock()
	defer l.lock.Unlock()

	if l.count == l.rate-1 {
		now := time.Now()
		if now.Sub(l.begin) >= l.cycle {
			//速度允许范围内, 重置计数器
			l.Reset(now)
			return true
		} else {
			return false
		}
	} else {
		//没有达到速率限制,计数加1
		l.count++
		return true
	}
}

func (l *LimitRate) Set(r int, cycle time.Duration) {
	l.rate = r
	l.begin = time.Now()
	l.cycle = cycle
	l.count = 0
}

func (l *LimitRate) Reset(t time.Time) {
	l.begin = t
	l.count = 0
}

func main() {
	var wg sync.WaitGroup
	var lr LimitRate
	lr.Set(3, time.Second) // 1s内最多请求3次

	for i := 0; i < 10; i++ {
		wg.Add(1)

		fmt.Println("Create req", i, time.Now())
		go func(i int) {
			if lr.Allow() {
				fmt.Println("--> Respon req", i, time.Now())
			}
			wg.Done()
		}(i)

		time.Sleep(200 * time.Millisecond)
	}
	wg.Wait()
}
➜  Desktop go run demo.go
Create req 0 2019-04-26 14:03:15.199687 +0800 CST m=+0.000279946
--> Respon req 0 2019-04-26 14:03:15.199941 +0800 CST m=+0.000533627
Create req 1 2019-04-26 14:03:15.401612 +0800 CST m=+0.202201856
--> Respon req 1 2019-04-26 14:03:15.401685 +0800 CST m=+0.202275403
Create req 2 2019-04-26 14:03:15.603891 +0800 CST m=+0.404478517
Create req 3 2019-04-26 14:03:15.805591 +0800 CST m=+0.606175892
Create req 4 2019-04-26 14:03:16.007908 +0800 CST m=+0.808490115
Create req 5 2019-04-26 14:03:16.212403 +0800 CST m=+1.012982672
--> Respon req 5 2019-04-26 14:03:16.212545 +0800 CST m=+1.013124115
Create req 6 2019-04-26 14:03:16.417028 +0800 CST m=+1.217605082
--> Respon req 6 2019-04-26 14:03:16.417187 +0800 CST m=+1.217763798
Create req 7 2019-04-26 14:03:16.61831 +0800 CST m=+1.418884059
--> Respon req 7 2019-04-26 14:03:16.618555 +0800 CST m=+1.419129152
Create req 8 2019-04-26 14:03:16.819132 +0800 CST m=+1.619707477
Create req 9 2019-04-26 14:03:17.02441 +0800 CST m=+1.824979126

示例一的方案是超过请求速率直接舍弃请求,示例二为超过请求速率的后续请求依次被排队处理成功,修改 Allow 方法:

func (l *LimitRate) Allow() bool {
	l.lock.Lock()
	defer l.lock.Unlock()

	if l.count == l.rate-1 {
		for {
			now := time.Now()
			if now.Sub(l.begin) >= l.cycle {
				//速度允许范围内, 重置计数器
				l.Reset(now)
				return true
			} else {
				// wait
			}
		}
	} else {
		//没有达到速率限制,计数加1
		l.count++
		return true
	}
}

漏桶算法(Leaky Bucket)

漏桶(Leaky Bucket)算法思路很简单,

  • 水(请求)先进入到漏桶里
  • 漏桶以一定的速度出水(接口有响应速率)
  • 当水流入速度过大会直接溢出(请求访问频率超过接口响应速率),然后就拒绝请求

leaky
leaky

漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶(包缓存)溢出,那么数据包会被丢弃。 在网络中,漏桶算法可以控制端口的流量输出速率,平滑网络上的突发流量,实现流量整形,从而为网络提供一个稳定的流量。

在某些情况下,漏桶算法不能够有效地使用网络资源。因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使某一个单独的流突发到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法可以结合起来为网络流量提供更大的控制。

示例代码

package main

import (
	"fmt"
	"math"
	"sync"
	"time"
)

type LeakyBucket struct {
	rate       float64 //固定每秒出水速率
	capacity   float64 //桶的容量
	water      float64 //桶中当前水量
	lastLeakMs int64   //桶上次漏水时间戳 ms

	lock sync.Mutex
}

func (l *LeakyBucket) Allow() bool {
	l.lock.Lock()
	defer l.lock.Unlock()

	// 1. 计算时间区间内会漏出多少水
	// 2. 计算桶内剩余的水量
	// 3. 根据水量进行判断结果

	now := time.Now().UnixNano() / 1e6
	eclipse := float64((now - l.lastLeakMs)) * l.rate / 1000 //先执行漏水
	l.water = l.water - eclipse                              //计算剩余水量
	l.water = math.Max(0, l.water)                           //判断桶是否干了
	l.lastLeakMs = now
	if (l.water + 1) < l.capacity {
		// 尝试加水,并且水还未满
		l.water++
		return true
	}

	// 水满,拒绝加水
	return false
}

func (l *LeakyBucket) Set(r, c float64) {
	l.rate = r
	l.capacity = c
	l.water = 0
	l.lastLeakMs = time.Now().UnixNano() / 1e6
}

func main() {
	var wg sync.WaitGroup
	var lr LeakyBucket
	lr.Set(3, 3) //每秒访问速率限制为3个请求,桶容量为3

	for i := 0; i < 10; i++ {
		wg.Add(1)

		fmt.Println("Create req", i, time.Now())
		go func(i int) {
			if lr.Allow() {
				fmt.Println("--> Respon req", i, time.Now())
			}
			wg.Done()
		}(i)

		time.Sleep(100 * time.Millisecond)
	}
	wg.Wait()
}

➜  Desktop go run demo.go
Create req 0 2019-04-26 14:21:15.584799 +0800 CST m=+0.000242467
--> Respon req 0 2019-04-26 14:21:15.585055 +0800 CST m=+0.000498952
Create req 1 2019-04-26 14:21:15.685606 +0800 CST m=+0.101047914
--> Respon req 1 2019-04-26 14:21:15.685693 +0800 CST m=+0.101135159
Create req 2 2019-04-26 14:21:15.785765 +0800 CST m=+0.201206053
--> Respon req 2 2019-04-26 14:21:15.785894 +0800 CST m=+0.201334943
Create req 3 2019-04-26 14:21:15.886463 +0800 CST m=+0.301902515
Create req 4 2019-04-26 14:21:15.988655 +0800 CST m=+0.404093353
--> Respon req 4 2019-04-26 14:21:15.988783 +0800 CST m=+0.404221263
Create req 5 2019-04-26 14:21:16.093164 +0800 CST m=+0.508601441
Create req 6 2019-04-26 14:21:16.193452 +0800 CST m=+0.608887401
Create req 7 2019-04-26 14:21:16.295473 +0800 CST m=+0.710907621
--> Respon req 7 2019-04-26 14:21:16.295547 +0800 CST m=+0.710981255
Create req 8 2019-04-26 14:21:16.395596 +0800 CST m=+0.811029203
Create req 9 2019-04-26 14:21:16.496065 +0800 CST m=+0.911496585

令牌桶算法(Token Bucket)

令牌桶算法的原理是

  • 以一个恒定的速度往桶里放入令牌,如果桶已经满了就不再加了
  • 如果请求需要被处理,则需要先从桶里获取一个令牌
  • 当桶里没有令牌可取时,则拒绝服务

例如,系统会按恒定 1/QPS 秒时间间隔(如果QPS=100,则间隔是10ms) 往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务。

令牌桶的另外一个好处是可以方便的改变速度。一旦需要提高速率,则按需提高放入桶中的令牌的速率。 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量。

token_bucket
token_bucket

示例代码

package main

import (
	"fmt"
	"sync"
	"time"
)

type TokenBucket struct {
	rate         int64 //固定的token放入速率, r/s
	capacity     int64 //桶的容量
	tokens       int64 //桶中当前token数量
	lastTokenSec int64 //桶上次放token的时间戳 s

	lock sync.Mutex
}

func (l *TokenBucket) Allow() bool {
	l.lock.Lock()
	defer l.lock.Unlock()

	// 1. 计算会有多少令牌,
	// 2. 如果令牌超过限制则不添加
	// 3. 取令牌

	now := time.Now().Unix()
	l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // 先添加令牌
	if l.tokens > l.capacity {
		l.tokens = l.capacity
	}
	l.lastTokenSec = now
	if l.tokens > 0 {
		// 还有令牌,领取令牌
		l.tokens--
		return true
	}

	// 没有令牌,则拒绝
	return false
}

func (l *TokenBucket) Set(r, c int64) {
	l.rate = r
	l.capacity = c
	l.tokens = 0
	l.lastTokenSec = time.Now().Unix()
}

func main() {
	var wg sync.WaitGroup
	var lr TokenBucket
	lr.Set(3, 3) //每秒访问速率限制为3个请求,桶容量为3

	time.Sleep(time.Second)
	for i := 0; i < 10; i++ {
		wg.Add(1)

		fmt.Println("Create req", i, time.Now())
		go func(i int) {
			if lr.Allow() {
				fmt.Println("--> Respon req", i, time.Now())
			}
			wg.Done()
		}(i)

		time.Sleep(200 * time.Millisecond)
	}
	wg.Wait()
}
➜  Desktop go run demo.go
Create req 0 2019-04-26 14:26:17.143325 +0800 CST m=+1.000477091
--> Respon req 0 2019-04-26 14:26:17.143825 +0800 CST m=+1.000977046
Create req 1 2019-04-26 14:26:17.343865 +0800 CST m=+1.201014713
--> Respon req 1 2019-04-26 14:26:17.344034 +0800 CST m=+1.201183419
Create req 2 2019-04-26 14:26:17.54822 +0800 CST m=+1.405367439
--> Respon req 2 2019-04-26 14:26:17.548539 +0800 CST m=+1.405686429
Create req 3 2019-04-26 14:26:17.752402 +0800 CST m=+1.609546920
Create req 4 2019-04-26 14:26:17.952578 +0800 CST m=+1.809719933
Create req 5 2019-04-26 14:26:18.152836 +0800 CST m=+2.009974826
--> Respon req 5 2019-04-26 14:26:18.152914 +0800 CST m=+2.010052499
Create req 6 2019-04-26 14:26:18.353009 +0800 CST m=+2.210145817
--> Respon req 6 2019-04-26 14:26:18.353114 +0800 CST m=+2.210250112
Create req 7 2019-04-26 14:26:18.553429 +0800 CST m=+2.410563051
--> Respon req 7 2019-04-26 14:26:18.553552 +0800 CST m=+2.410685553
Create req 8 2019-04-26 14:26:18.758259 +0800 CST m=+2.615389898
Create req 9 2019-04-26 14:26:18.960854 +0800 CST m=+2.817982809

其他示例

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法(Token Bucket)来完成限流。它支持两种获取permits接口,一种是如果拿不到立刻返回false,一种会阻塞等待一段时间看能不能拿到.

  • 有很多任务需要执行,但是我们不希望每秒超过两个任务执行。
  • 假如我们会产生一个数据流,然后我们想以每秒5kb的速度发送出去.我们可以每获取一个令牌(permit)就发送一个byte的数据,这样我们就可以通过一个每秒5000个令牌

实现QPS速率的最简单的方式就是记住上一次请求的最后授权时间,然后保证1/QPS秒内不允许请求进入.比如QPS=5,如果我们保证最后一个被授权请求之后的200ms的时间内没有请求被授权,那么我们就达到了预期的速率.如果一个请求现在过来但是最后一个被授权请求是在100ms之前,那么我们就要求当前这个请求等待100ms.按照这个思路,请求15个新令牌(许可证)就需要3秒。

令牌桶是允许一定程度的并发的,比如同一个时刻,有100个用户请求,只要令牌桶中有100个令牌,那么这100个请求全都会放过去。令牌桶在桶中没有令牌的情况下也会退化为漏桶模型。

rate 示例

golang.org/x/time/rate 包采用了令牌桶算法来实现速率限制

// Limit 定义了事件的最大频率。
// Limit 被表示为每秒事件的数量。
// 值为0的Limit不允许任何事件。
type Limit float64

// 返回一个新的Limiter实例,
// 事件发生率为r,并允许至多b个令牌爆发。
func NewLimiter(r Limit, b int) *Limiter

// Every 将事件之间的最小时间间隔转换为 Limit。
func Every(interval time.Duration) Limit
package main

import (
	"net/http"

	"golang.org/x/time/rate"
)

// 事件发生率为r,并允许至多b个令牌爆发。
var limiter = rate.NewLimiter(2, 5) 

func limit(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if !limiter.Allow() {
			http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
			return
		}
		next.ServeHTTP(w, r)
	})
}

func okHandler(w http.ResponseWriter, r *http.Request) {
	w.Write([]byte("OK\n"))
}

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/", okHandler)
	http.ListenAndServe(":4000", limit(mux))
}
➜  ~ while true; do
while> curl http://127.0.0.1:4000
while> sleep 0.1s
while> done
OK
OK
OK
OK
OK
OK
Too Many Requests
Too Many Requests
OK
Too Many Requests
Too Many Requests
Too Many Requests
OK
Too Many Requests
Too Many Requests
Too Many Requests
OK
Too Many Requests
Too Many Requests
Too Many Requests
OK

rate-limiting 示例

https://gobyexample.com/rate-limiting

// <em>[Rate limiting](http://en.wikipedia.org/wiki/Rate_limiting)</em>
// is an important mechanism for controlling resource
// utilization and maintaining quality of service. Go
// elegantly supports rate limiting with goroutines,
// channels, and [tickers](tickers).

package main

import "time"
import "fmt"

func main() {

    // First we'll look at basic rate limiting. Suppose
    // we want to limit our handling of incoming requests.
    // We'll serve these requests off a channel of the
    // same name.
    requests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)

    // This `limiter` channel will receive a value
    // every 200 milliseconds. This is the regulator in
    // our rate limiting scheme.
    limiter := time.Tick(200 * time.Millisecond)

    // By blocking on a receive from the `limiter` channel
    // before serving each request, we limit ourselves to
    // 1 request every 200 milliseconds.
    for req := range requests {
        <-limiter
        fmt.Println("request", req, time.Now())
    }

    // We may want to allow short bursts of requests in
    // our rate limiting scheme while preserving the
    // overall rate limit. We can accomplish this by
    // buffering our limiter channel. This `burstyLimiter`
    // channel will allow bursts of up to 3 events.
    burstyLimiter := make(chan time.Time, 3)

    // Fill up the channel to represent allowed bursting.
    for i := 0; i < 3; i++ {
        burstyLimiter <- time.Now()
    }

    // Every 200 milliseconds we'll try to add a new
    // value to `burstyLimiter`, up to its limit of 3.
    go func() {
        for t := range time.Tick(200 * time.Millisecond) {
            burstyLimiter <- t
        }
    }()

    // Now simulate 5 more incoming requests. The first
    // 3 of these will benefit from the burst capability
    // of `burstyLimiter`.
    burstyRequests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        burstyRequests <- i
    }
    close(burstyRequests)
    for req := range burstyRequests {
        <-burstyLimiter
        fmt.Println("request", req, time.Now())
    }
}

参考链接