使用Redis实现IP限制

限制IP请求次数

案例场景:

为避免暴力破解,如果用户输入密码,在 1 秒内连续输错了 5 次,则锁定 ip,锁十分钟。

实现代码如下:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import redis

pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_tool = redis.Redis(connection_pool=pool)


def check_ip_is_locked(ip)
    key = "black_ip_{}".format(ip)
    value = redis_tool.get(key)
    if value and int(value) > 5:
      	redis_tool.set(key, 6, ex=10*60)	# 锁定十分钟
      	return True
    return False


def record_ip_request(ip):
    key = "black_ip_{}".format(ip)
    cnt = redis_tool.incr(key)
    if cnt == 1:
      	redis_tool.expire(key, 1)	# 设置过期时间


def post(request):
    # 从请求参数中获取 ip
    ip = request.data.ip
    password = request.data.password
    real_password = "123456"
    
    ip_locked = check_ip_is_locked(ip)
    if ip_locked:
      	return "ip locked"
    
    # 校验密码
    if password != real_password:
      	record_ip_request(ip)
      	return "password error"

一种简单 IP 限流

使用 zset 数据结构,记录用户某个时间段内的行为,如果时间段内的行为次数超过限制的次数,则触发限制。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import redis

pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_tool = redis.Redis(connection_pool=pool)


def check_ip_locked(ip, period, max_count):
    key = "action_allowed_ip_{}".format(ip)
    now_ts = int(time.time() * 1000)  # 毫秒时间戳
    with redis_tool.pipeline() as pipe:  # client 是 StrictRedis 实例
        # 记录行为
        pipe.zadd(key, now_ts, now_ts)  # value 和 score 都使用毫秒时间戳
        # 移除时间窗口之前的行为记录,剩下的都是时间窗口内的
        pipe.zremrangebyscore(key, 0, now_ts - period * 1000)
        # 获取窗口内的行为数量
        pipe.zcard(key)
        # 设置 zset 过期时间,避免冷用户持续占用内存
        # 过期时间应该等于时间窗口的长度,再多宽限 1s
        pipe.expire(key, period + 1)
        # 批量执行
        _, _, current_count, _ = pipe.execute()
    # 比较数量是否超标
    return current_count <= max_count    


def post(request):
    # 从请求参数中获取 ip
    ip = request.data.ip
    password = request.data.password
    real_password = "123456"
    
    period = 1
    max_count = 5
    ip_locked = check_ip_is_locked(ip, period, max_count)
    if ip_locked:
      	return "ip locked"
    

一种高级 IP 限流

前一种使用 zset 实现的限流对于数据量较小的场景比较适用,但是对于数据量大的高并发请求则对内存消耗比较大,下面使用一种单机漏斗算法来实现限流

import time

class Funnel(object):

    def __init__(self, capacity, leaking_rate):
        self.capacity = capacity  # 漏斗容量
        self.leaking_rate = leaking_rate  # 漏嘴流水速率
        self.left_quota = capacity  # 漏斗剩余空间
        self.leaking_ts = time.time()  # 上一次漏水时间

    def make_space(self):
        now_ts = time.time()
        delta_ts = now_ts - self.leaking_ts  # 距离上一次漏水过去了多久
        delta_quota = delta_ts * self.leaking_rate  # 腾出空间
        if delta_quota < 1:
            return
        self.left_quota += delta_quota  # 增加剩余空间
        self.leaking_ts = now_ts  # 记录漏水时间
        if self.left_quota > self.capacity:  # 剩余空间不得高于容量
            self.left_quota = self.capacity

    def watering(self, quota):
        self.make_space()
        if self.left_quota >= quota:  # 判断剩余空间是否足够
            self.left_quota -= quota
            return True
        return False


funnels = {}  # 所有的漏斗

# capacity  漏斗容量
# leaking_rate 漏嘴流水速率 quota/s
def is_action_allowed(ip, capacity, leaking_rate):
    key = "action_allowed_ip_{}".format(ip)
    funnel = funnels.get(key)
    if not funnel:
        funnel = Funnel(capacity, leaking_rate)
        funnels[key] = funnel
    return funnel.watering(1)

 


关注微信公众账号「曹当家的」,订阅最新文章推送

Table of Contents