使用Redis实现IP限制
限制IP请求次数
案例场景:
为避免暴力破解,如果用户输入密码,在 1 秒内连续输错了 5 次,则锁定 ip,锁十分钟。
实现代码如下:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import redis
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_tool = redis.Redis(connection_pool=pool)
def check_ip_is_locked(ip)
key = "black_ip_{}".format(ip)
value = redis_tool.get(key)
if value and int(value) > 5:
redis_tool.set(key, 6, ex=10*60) # 锁定十分钟
return True
return False
def record_ip_request(ip):
key = "black_ip_{}".format(ip)
cnt = redis_tool.incr(key)
if cnt == 1:
redis_tool.expire(key, 1) # 设置过期时间
def post(request):
# 从请求参数中获取 ip
ip = request.data.ip
password = request.data.password
real_password = "123456"
ip_locked = check_ip_is_locked(ip)
if ip_locked:
return "ip locked"
# 校验密码
if password != real_password:
record_ip_request(ip)
return "password error"
一种简单 IP 限流
使用 zset 数据结构,记录用户某个时间段内的行为,如果时间段内的行为次数超过限制的次数,则触发限制。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import redis
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_tool = redis.Redis(connection_pool=pool)
def check_ip_locked(ip, period, max_count):
key = "action_allowed_ip_{}".format(ip)
now_ts = int(time.time() * 1000) # 毫秒时间戳
with redis_tool.pipeline() as pipe: # client 是 StrictRedis 实例
# 记录行为
pipe.zadd(key, now_ts, now_ts) # value 和 score 都使用毫秒时间戳
# 移除时间窗口之前的行为记录,剩下的都是时间窗口内的
pipe.zremrangebyscore(key, 0, now_ts - period * 1000)
# 获取窗口内的行为数量
pipe.zcard(key)
# 设置 zset 过期时间,避免冷用户持续占用内存
# 过期时间应该等于时间窗口的长度,再多宽限 1s
pipe.expire(key, period + 1)
# 批量执行
_, _, current_count, _ = pipe.execute()
# 比较数量是否超标
return current_count <= max_count
def post(request):
# 从请求参数中获取 ip
ip = request.data.ip
password = request.data.password
real_password = "123456"
period = 1
max_count = 5
ip_locked = check_ip_is_locked(ip, period, max_count)
if ip_locked:
return "ip locked"
一种高级 IP 限流
前一种使用 zset 实现的限流对于数据量较小的场景比较适用,但是对于数据量大的高并发请求则对内存消耗比较大,下面使用一种单机漏斗算法来实现限流
import time
class Funnel(object):
def __init__(self, capacity, leaking_rate):
self.capacity = capacity # 漏斗容量
self.leaking_rate = leaking_rate # 漏嘴流水速率
self.left_quota = capacity # 漏斗剩余空间
self.leaking_ts = time.time() # 上一次漏水时间
def make_space(self):
now_ts = time.time()
delta_ts = now_ts - self.leaking_ts # 距离上一次漏水过去了多久
delta_quota = delta_ts * self.leaking_rate # 腾出空间
if delta_quota < 1:
return
self.left_quota += delta_quota # 增加剩余空间
self.leaking_ts = now_ts # 记录漏水时间
if self.left_quota > self.capacity: # 剩余空间不得高于容量
self.left_quota = self.capacity
def watering(self, quota):
self.make_space()
if self.left_quota >= quota: # 判断剩余空间是否足够
self.left_quota -= quota
return True
return False
funnels = {} # 所有的漏斗
# capacity 漏斗容量
# leaking_rate 漏嘴流水速率 quota/s
def is_action_allowed(ip, capacity, leaking_rate):
key = "action_allowed_ip_{}".format(ip)
funnel = funnels.get(key)
if not funnel:
funnel = Funnel(capacity, leaking_rate)
funnels[key] = funnel
return funnel.watering(1)
关注微信公众账号「曹当家的」,订阅最新文章推送