您好,欢迎访问一九零五行业门户网

怎么使用Go+Redis实现常见限流算法

固定窗口使用redis实现固定窗口比较简单,主要是由于固定窗口同时只会存在一个窗口,所以我们可以在第一次进入窗口时使用pexpire命令设置过期时间为窗口时间大小,这样窗口会随过期时间而失效,同时我们使用incr命令增加窗口计数。
因为我们需要在counter==1的时候设置窗口的过期时间,为了保证原子性,我们使用简单的lua脚本实现。
const fixedwindowlimitertryacquireredisscript = `-- argv[1]: 窗口时间大小-- argv[2]: 窗口请求上限local window = tonumber(argv[1])local limit = tonumber(argv[2])-- 获取原始值local counter = tonumber(redis.call("get", keys[1]))if counter == nil then counter = 0end-- 若到达窗口请求上限,请求失败if counter >= limit then return 0end-- 窗口值+1redis.call("incr", keys[1])if counter == 0 then redis.call("pexpire", keys[1], window)endreturn 1`
package redisimport ( "context" "errors" "github.com/go-redis/redis/v8" "time")// fixedwindowlimiter 固定窗口限流器type fixedwindowlimiter struct { limit int // 窗口请求上限 window int // 窗口时间大小 client *redis.client // redis客户端 script *redis.script // tryacquire脚本}func newfixedwindowlimiter(client *redis.client, limit int, window time.duration) (*fixedwindowlimiter, error) { // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if window%time.millisecond != 0 { return nil, errors.new("the window uint must not be less than millisecond") } return &fixedwindowlimiter{ limit: limit, window: int(window / time.millisecond), client: client, script: redis.newscript(fixedwindowlimitertryacquireredisscript), }, nil}func (l *fixedwindowlimiter) tryacquire(ctx context.context, resource string) error { success, err := l.script.run(ctx, l.client, []string{resource}, l.window, l.limit).bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return erracquirefailed } return nil}
滑动窗口hash实现我们使用redis的hash存储每个小窗口的计数,每次请求会把所有有效窗口的计数累加到count,使用hdel删除失效窗口,最后判断窗口的总计数是否大于上限。
我们基本上把所有的逻辑都放到lua脚本里面,其中大头是对hash的遍历,时间复杂度是o(n),n是小窗口数量,所以小窗口数量最好不要太多。
const slidingwindowlimitertryacquireredisscripthashimpl = `-- argv[1]: 窗口时间大小-- argv[2]: 窗口请求上限-- argv[3]: 当前小窗口值-- argv[4]: 起始小窗口值local window = tonumber(argv[1])local limit = tonumber(argv[2])local currentsmallwindow = tonumber(argv[3])local startsmallwindow = tonumber(argv[4])-- 计算当前窗口的请求总数local counters = redis.call("hgetall", keys[1])local count = 0for i = 1, #(counters) / 2 do local smallwindow = tonumber(counters[i * 2 - 1]) local counter = tonumber(counters[i * 2]) if smallwindow < startsmallwindow then redis.call("hdel", keys[1], smallwindow) else count = count + counter endend-- 若到达窗口请求上限,请求失败if count >= limit then return 0end-- 若没到窗口请求上限,当前小窗口计数器+1,请求成功redis.call("hincrby", keys[1], currentsmallwindow, 1)redis.call("pexpire", keys[1], window)return 1`
package redisimport ( "context" "errors" "github.com/go-redis/redis/v8" "time")// slidingwindowlimiter 滑动窗口限流器type slidingwindowlimiter struct { limit int // 窗口请求上限 window int64 // 窗口时间大小 smallwindow int64 // 小窗口时间大小 smallwindows int64 // 小窗口数量 client *redis.client // redis客户端 script *redis.script // tryacquire脚本}func newslidingwindowlimiter(client *redis.client, limit int, window, smallwindow time.duration) ( *slidingwindowlimiter, error) { // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if window%time.millisecond != 0 || smallwindow%time.millisecond != 0 { return nil, errors.new("the window uint must not be less than millisecond") } // 窗口时间必须能够被小窗口时间整除 if window%smallwindow != 0 { return nil, errors.new("window cannot be split by integers") } return &slidingwindowlimiter{ limit: limit, window: int64(window / time.millisecond), smallwindow: int64(smallwindow / time.millisecond), smallwindows: int64(window / smallwindow), client: client, script: redis.newscript(slidingwindowlimitertryacquireredisscripthashimpl), }, nil}func (l *slidingwindowlimiter) tryacquire(ctx context.context, resource string) error { // 获取当前小窗口值 currentsmallwindow := time.now().unixmilli() / l.smallwindow * l.smallwindow // 获取起始小窗口值 startsmallwindow := currentsmallwindow - l.smallwindow*(l.smallwindows-1) success, err := l.script.run( ctx, l.client, []string{resource}, l.window, l.limit, currentsmallwindow, startsmallwindow).bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return erracquirefailed } return nil}
list实现如果小窗口数量特别多,可以使用list优化时间复杂度,list的结构是:
[counter, smallwindow1, count1, smallwindow2, count2, smallwindow3, count3...]
也就是我们使用list的第一个元素存储计数器,每个窗口用两个元素表示,第一个元素表示小窗口值,第二个元素表示这个小窗口的计数。由于redis lua脚本不支持字符串分割函数,因此不能将小窗口的值和计数放在同一元素中。
具体操作流程:
1.获取list长度
2.如果长度是0,设置counter,长度+1
3.如果长度大于1,获取第二第三个元素
如果该值小于起始小窗口值,counter-第三个元素的值,删除第二第三个元素,长度-2
4.如果counter大于等于limit,请求失败
5.如果长度大于1,获取倒数第二第一个元素
如果倒数第二个元素小窗口值大于等于当前小窗口值,表示当前请求因为网络延迟的问题,到达服务器的时候,窗口已经过时了,把倒数第二个元素当成当前小窗口(因为它更新),倒数第一个元素值+1
否则,添加新的窗口值,添加新的计数(1),更新过期时间
6.否则,添加新的窗口值,添加新的计数(1),更新过期时间
7.counter + 1
8.返回成功
const slidingwindowlimitertryacquireredisscriptlistimpl = `-- argv[1]: 窗口时间大小-- argv[2]: 窗口请求上限-- argv[3]: 当前小窗口值-- argv[4]: 起始小窗口值local window = tonumber(argv[1])local limit = tonumber(argv[2])local currentsmallwindow = tonumber(argv[3])local startsmallwindow = tonumber(argv[4])-- 获取list长度local len = redis.call("llen", keys[1])-- 如果长度是0,设置counter,长度+1local counter = 0if len == 0 then redis.call("rpush", keys[1], 0) redis.call("pexpire", keys[1], window) len = len + 1else -- 如果长度大于1,获取第二第个元素 local smallwindow1 = tonumber(redis.call("lindex", keys[1], 1)) counter = tonumber(redis.call("lindex", keys[1], 0)) -- 如果该值小于起始小窗口值 if smallwindow1 < startsmallwindow then local count1 = redis.call("lindex", keys[1], 2) -- counter-第三个元素的值 counter = counter - count1 -- 长度-2 len = len - 2 -- 删除第二第三个元素 redis.call("lrem", keys[1], 1, smallwindow1) redis.call("lrem", keys[1], 1, count1) endend-- 若到达窗口请求上限,请求失败if counter >= limit then return 0end -- 如果长度大于1,获取倒数第二第一个元素if len > 1 then local smallwindown = tonumber(redis.call("lindex", keys[1], -2)) -- 如果倒数第二个元素小窗口值大于等于当前小窗口值 if smallwindown >= currentsmallwindow then -- 把倒数第二个元素当成当前小窗口(因为它更新),倒数第一个元素值+1 local countn = redis.call("lindex", keys[1], -1) redis.call("lset", keys[1], -1, countn + 1) else -- 否则,添加新的窗口值,添加新的计数(1),更新过期时间 redis.call("rpush", keys[1], currentsmallwindow, 1) redis.call("pexpire", keys[1], window) endelse -- 否则,添加新的窗口值,添加新的计数(1),更新过期时间 redis.call("rpush", keys[1], currentsmallwindow, 1) redis.call("pexpire", keys[1], window)end -- counter + 1并更新redis.call("lset", keys[1], 0, counter + 1)return 1`
算法都是操作list头部或者尾部,所以时间复杂度接近o(1)
漏桶算法漏桶需要保存当前水位和上次放水时间,因此我们使用hash来保存这两个值。
const leakybucketlimitertryacquireredisscript = `-- argv[1]: 最高水位-- argv[2]: 水流速度/秒-- argv[3]: 当前时间(秒)local peaklevel = tonumber(argv[1])local currentvelocity = tonumber(argv[2])local now = tonumber(argv[3])local lasttime = tonumber(redis.call("hget", keys[1], "lasttime"))local currentlevel = tonumber(redis.call("hget", keys[1], "currentlevel"))-- 初始化if lasttime == nil then lasttime = now currentlevel = 0 redis.call("hmset", keys[1], "currentlevel", currentlevel, "lasttime", lasttime)end -- 尝试放水-- 距离上次放水的时间local interval = now - lasttimeif interval > 0 then -- 当前水位-距离上次放水的时间(秒)*水流速度 local newlevel = currentlevel - interval * currentvelocity if newlevel < 0 then newlevel = 0 end currentlevel = newlevel redis.call("hmset", keys[1], "currentlevel", newlevel, "lasttime", now)end-- 若到达最高水位,请求失败if currentlevel >= peaklevel then return 0end-- 若没有到达最高水位,当前水位+1,请求成功redis.call("hincrby", keys[1], "currentlevel", 1)redis.call("expire", keys[1], peaklevel / currentvelocity)return 1`
package redisimport ( "context" "github.com/go-redis/redis/v8" "time")// leakybucketlimiter 漏桶限流器type leakybucketlimiter struct { peaklevel int // 最高水位 currentvelocity int // 水流速度/秒 client *redis.client // redis客户端 script *redis.script // tryacquire脚本}func newleakybucketlimiter(client *redis.client, peaklevel, currentvelocity int) *leakybucketlimiter { return &leakybucketlimiter{ peaklevel: peaklevel, currentvelocity: currentvelocity, client: client, script: redis.newscript(leakybucketlimitertryacquireredisscript), }}func (l *leakybucketlimiter) tryacquire(ctx context.context, resource string) error { // 当前时间 now := time.now().unix() success, err := l.script.run(ctx, l.client, []string{resource}, l.peaklevel, l.currentvelocity, now).bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return erracquirefailed } return nil}
令牌桶令牌桶可以看作是漏桶的相反算法,它们一个是把水倒进桶里,一个是从桶里获取令牌。
const tokenbucketlimitertryacquireredisscript = `-- argv[1]: 容量-- argv[2]: 发放令牌速率/秒-- argv[3]: 当前时间(秒)local capacity = tonumber(argv[1])local rate = tonumber(argv[2])local now = tonumber(argv[3])local lasttime = tonumber(redis.call("hget", keys[1], "lasttime"))local currenttokens = tonumber(redis.call("hget", keys[1], "currenttokens"))-- 初始化if lasttime == nil then lasttime = now currenttokens = capacity redis.call("hmset", keys[1], "currenttokens", currenttokens, "lasttime", lasttime)end -- 尝试发放令牌-- 距离上次发放令牌的时间local interval = now - lasttimeif interval > 0 then -- 当前令牌数量+距离上次发放令牌的时间(秒)*发放令牌速率 local newtokens = currenttokens + interval * rate if newtokens > capacity then newtokens = capacity end currenttokens = newtokens redis.call("hmset", keys[1], "currenttokens", newtokens, "lasttime", now)end-- 如果没有令牌,请求失败if currenttokens == 0 then return 0end-- 果有令牌,当前令牌-1,请求成功redis.call("hincrby", keys[1], "currenttokens", -1)redis.call("expire", keys[1], capacity / rate)return 1`
package redisimport ( "context" "github.com/go-redis/redis/v8" "time")// tokenbucketlimiter 令牌桶限流器type tokenbucketlimiter struct { capacity int // 容量 rate int // 发放令牌速率/秒 client *redis.client // redis客户端 script *redis.script // tryacquire脚本}func newtokenbucketlimiter(client *redis.client, capacity, rate int) *tokenbucketlimiter { return &tokenbucketlimiter{ capacity: capacity, rate: rate, client: client, script: redis.newscript(tokenbucketlimitertryacquireredisscript), }}func (l *tokenbucketlimiter) tryacquire(ctx context.context, resource string) error { // 当前时间 now := time.now().unix() success, err := l.script.run(ctx, l.client, []string{resource}, l.capacity, l.rate, now).bool() if err != nil { return err } // 若到达窗口请求上限,请求失败 if !success { return erracquirefailed } return nil}
滑动日志算法流程与滑动窗口相同,只是它可以指定多个策略,同时在请求失败的时候,需要通知调用方是被哪个策略所拦截。
const slidingloglimitertryacquireredisscripthashimpl = `-- argv[1]: 当前小窗口值-- argv[2]: 第一个策略的窗口时间大小-- argv[i * 2 + 1]: 每个策略的起始小窗口值-- argv[i * 2 + 2]: 每个策略的窗口请求上限local currentsmallwindow = tonumber(argv[1])-- 第一个策略的窗口时间大小local window = tonumber(argv[2])-- 第一个策略的起始小窗口值local startsmallwindow = tonumber(argv[3])local strategieslen = #(argv) / 2 - 1-- 计算每个策略当前窗口的请求总数local counters = redis.call("hgetall", keys[1])local counts = {}-- 初始化countsfor j = 1, strategieslen do counts[j] = 0endfor i = 1, #(counters) / 2 do local smallwindow = tonumber(counters[i * 2 - 1]) local counter = tonumber(counters[i * 2]) if smallwindow < startsmallwindow then redis.call("hdel", keys[1], smallwindow) else for j = 1, strategieslen do if smallwindow >= tonumber(argv[j * 2 + 1]) then counts[j] = counts[j] + counter end end endend-- 若到达对应策略窗口请求上限,请求失败,返回违背的策略下标for i = 1, strategieslen do if counts[i] >= tonumber(argv[i * 2 + 2]) then return i - 1 endend-- 若没到窗口请求上限,当前小窗口计数器+1,请求成功redis.call("hincrby", keys[1], currentsmallwindow, 1)redis.call("pexpire", keys[1], window)return -1`
package redisimport ( "context" "errors" "fmt" "github.com/go-redis/redis/v8" "sort" "time")// violationstrategyerror 违背策略错误type violationstrategyerror struct { limit int // 窗口请求上限 window time.duration // 窗口时间大小}func (e *violationstrategyerror) error() string { return fmt.sprintf("violation strategy that limit = %d and window = %d", e.limit, e.window)}// slidingloglimiterstrategy 滑动日志限流器的策略type slidingloglimiterstrategy struct { limit int // 窗口请求上限 window int64 // 窗口时间大小 smallwindows int64 // 小窗口数量}func newslidingloglimiterstrategy(limit int, window time.duration) *slidingloglimiterstrategy { return &slidingloglimiterstrategy{ limit: limit, window: int64(window), }}// slidingloglimiter 滑动日志限流器type slidingloglimiter struct { strategies []*slidingloglimiterstrategy // 滑动日志限流器策略列表 smallwindow int64 // 小窗口时间大小 client *redis.client // redis客户端 script *redis.script // tryacquire脚本}func newslidingloglimiter(client *redis.client, smallwindow time.duration, strategies ...*slidingloglimiterstrategy) ( *slidingloglimiter, error) { // 复制策略避免被修改 strategies = append(make([]*slidingloglimiterstrategy, 0, len(strategies)), strategies...) // 不能不设置策略 if len(strategies) == 0 { return nil, errors.new("must be set strategies") } // redis过期时间精度最大到毫秒,因此窗口必须能被毫秒整除 if smallwindow%time.millisecond != 0 { return nil, errors.new("the window uint must not be less than millisecond") } smallwindow = smallwindow / time.millisecond for _, strategy := range strategies { if strategy.window%int64(time.millisecond) != 0 { return nil, errors.new("the window uint must not be less than millisecond") } strategy.window = strategy.window / int64(time.millisecond) } // 排序策略,窗口时间大的排前面,相同窗口上限大的排前面 sort.slice(strategies, func(i, j int) bool { a, b := strategies[i], strategies[j] if a.window == b.window { return a.limit > b.limit } return a.window > b.window }) for i, strategy := range strategies { // 随着窗口时间变小,窗口上限也应该变小 if i > 0 { if strategy.limit >= strategies[i-1].limit { return nil, errors.new("the smaller window should be the smaller limit") } } // 窗口时间必须能够被小窗口时间整除 if strategy.window%int64(smallwindow) != 0 { return nil, errors.new("window cannot be split by integers") } strategy.smallwindows = strategy.window / int64(smallwindow) } return &slidingloglimiter{ strategies: strategies, smallwindow: int64(smallwindow), client: client, script: redis.newscript(slidingloglimitertryacquireredisscripthashimpl), }, nil}func (l *slidingloglimiter) tryacquire(ctx context.context, resource string) error { // 获取当前小窗口值 currentsmallwindow := time.now().unixmilli() / l.smallwindow * l.smallwindow args := make([]interface{}, len(l.strategies)*2+2) args[0] = currentsmallwindow args[1] = l.strategies[0].window // 获取每个策略的起始小窗口值 for i, strategy := range l.strategies { args[i*2+2] = currentsmallwindow - l.smallwindow*(strategy.smallwindows-1) args[i*2+3] = strategy.limit } index, err := l.script.run( ctx, l.client, []string{resource}, args...).int() if err != nil { return err } // 若到达窗口请求上限,请求失败 if index != -1 { return &violationstrategyerror{ limit: l.strategies[index].limit, window: time.duration(l.strategies[index].window), } } return nil}
以上就是怎么使用go+redis实现常见限流算法的详细内容。
其它类似信息

推荐信息