爬虫分发最难搞的,从来不是“要不要限流”——限了,业务没崩才算数。上周调度模块刚切上线,运营就甩过来一张截图:A 用户 10 个并发请求,被死死压在 500ms 窗口里,B 用户呢,同一条限流规则,他硬是跑满了 200 QPS。限是限了,但限歪了,比不限还闹心。

固定窗口限流用 FastAPI 写三行代码就能跑起来。可一旦上线,你会发现问题就像用秒表管马拉松——窗口切边那一下请求堆成山,跨窗口时令牌又清得干干净净,多租户共用同一个 Redis key 更是把资源挤到爆。更头疼的是,爬虫的合规用户偶尔需要批量拉取商品详情页,你把他的 QPS 卡死在 10,他那边的任务队列直接膨胀成半天才能跑完的批处理。

限流不是卡死,是让流量学会呼吸

重写中间件那天我打开第三版代码,看到硬编码的 time.time() // 60,第一反应就是删掉它。固定窗口在爬虫分发场景下等于给所有用户发同一张准点火车票——到点就挤,过点就废。

令牌桶的思路体贴得多。它给每个用户配了一个“弹性水壶”:桶容量设为 max_tokens=100,令牌按固定速率 refill_rate=2/s 往里灌。客户端的请求过来时拿走一个令牌就放行,桶空了直接返回 429。关键是“弹性”二字——A 用户前 5 秒一个请求没发,第 6 秒一口气来 30 个,只要桶够大,全部放行。这不正是批量拉取商品详情页最需要的呼吸空间么。

滑动窗口则另一种玩法。它用 Redis ZSET 算“最近 60 秒”的真实请求数,不切整点时间,而是为每个请求存一条带时间戳的 score:ZADD rate:uid_123 1717023456.789 "req_abc"。然后用 清掉 60 秒前的旧记录,ZCARD 查当前总数。精度是高,但它不给突发留半点余量——B 用户刚刷完 59 次请求,第 60 秒的第一个请求就被拦住了,体验直接断崖式下跌。

最终我写的中间件没有二选一。我把 TokenBucket 当主干算法,拿滑动窗口做兜底熔断:当单秒请求数超过阈值 3 倍时,临时降级成滑动窗口模式。配置热更新靠 watchdog.watch("config/limit.yaml") 实现,不用重启进程就能调参数。

comparison of token bucket and sliding window algorithms

动态配置才是真正的硬骨头

前文聊完算法选型,真正落地时你会遇到一个更棘手的麻烦:配置写死在代码里,每次调速率都得改环境变量然后重启服务。这在爬虫分发场景下简直是灾难——运营同事半夜发现某个爬虫用户触发了限流,打电话叫你改配置,你爬起来改完重启,用户那边的任务队列已经积压了半小时。

所以第三版中间件的核心设计目标很明确:支持动态热更新,不用重启服务就能调整每个用户的令牌桶参数和滑动窗口阈值。

我选择继承 ,这样能直接挂到 FastAPI 应用上。初始化时传入 Redis 连接池,所有限流状态都扔进 Redis——不占本地内存,也不怕多进程状态下数据不一致。

import aioredis
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

class DynamicRateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, redis_pool: aioredis.Redis, default_config: dict = None):
        super().__init__(app)
        self.redis = redis_pool
        self.default_config = default_config or {
            "tokens_per_second": 2,
            "bucket_capacity": 100,
            "window_seconds": 60,
            "window_limit": 200
        }
        self.config_cache = {}

是令牌桶的核心参数, 则是滑动窗口的兜底配置。每个用户都可以有独立的一套参数,存储在 Redis 的 Hash 里。

请求到达时,中间件先根据请求头里的 X-API-KeyX-User-ID 提取限流标识,然后执行两步检查:

async def dispatch(self, request: Request, call_next):
    user_key = request.headers.get("X-API-Key") or request.headers.get("X-User-ID") or "anonymous"
    user_config = await self._get_user_config(user_key)
    
    window_pass = await self._check_sliding_window(user_key, user_config)
    if not window_pass:
        return JSONResponse(status_code=429, content={"detail": "Too Many Requests", "retry_after": 1})
    
    token_pass = await self._try_consume_token(user_key, user_config)
    if not token_pass:
        return JSONResponse(status_code=429, content={"detail": "Token bucket exhausted", "retry_after": 0.5})
    
    response = await call_next(request)
    return response

滑动窗口这块我用 Redis ZSET 实现。每次请求来的时候,先 清掉 60 秒前的记录,再 ZADD 当前时间戳,最后 ZCARD 看总数。逻辑本身不复杂,但务必包装成 Lua 脚本保证原子性——否则高并发下会有竞态条件。

SLIDING_WINDOW_SCRIPT = """
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1] - ARGV[2])
redis.call('ZADD', KEYS[1], ARGV[1], ARGV[3])
local count = redis.call('ZCARD', KEYS[1])
return count
"""

令牌桶的实现也类似,用 Redis 的 Hash 存当前令牌数和上次刷新时间戳,每次消耗前先计算这段时间内应该补充的令牌数。

这部分代码其实不算复杂,真正折腾人的是下一步:动态配置

我加了一个 /admin/rate-limit/config 的 POST 端点,接收 JSON 体,用来更新某个用户的限流参数:

@app.post("/admin/rate-limit/config")
async def update_rate_limit_config(user_id: str, config: RateLimitConfig):
    await redis.hset(f"ratelimit:config:{user_id}", mapping=config.dict())
    middleware.config_cache.pop(user_id, None)
    return {"status": "updated"}

这样一来,运营同事在后台页面改个输入框,直接调用这个 API 就能实时调整限流策略。B 用户今晚要批量拉数据?临时把 从 2 调到 10,完事了再改回来。全程不需要动代码,不需要重启。

我特别给 设了 30 秒 TTL,避免每次请求都查 Redis 拖慢性能。更新配置时主动清除缓存,保证即时生效。

这个设计其实前面已经埋下了伏笔——每个用户都有自己的 ratelimit:config:{user_id}ratelimit:token:{user_id}ratelimit:window:{user_id}。一个用户疯狂刷接口,最多把自己的桶榨干,不会影响其他用户。

不过有个坑始终绕不过去:匿名用户。大量未认证的请求如果共用 anonymous 这个 key,那跟没限流没什么区别。我后来加了一层 IP 级别的降级——匿名用户超过一定阈值后,直接走 IP 粒度的滑动窗口,每个 IP 独立计数。

这个中间件上线跑了三天,运营同事再没半夜打过电话。A 用户的爬虫跑满 200 QPS 也不影响 B 用户的实时查询,动态配置改完秒生效。

implementing dynamic rate limiting middleware in FastAPI

完整代码里藏得最深的坑

把代码贴出来不是凑行数,是怕你抄漏一行——尤其是 aioredis 返回的是字节串,float() 做转换那几步,本地跑通了,线上 Redis 依然报 TypeError: float() argument must be a string or a number,我盯着那行代码看了 40 分钟才发现问题。

别用 pip install fastapi[all],它会偷偷拉一堆你用不着的 dev 依赖。干净点:

pip install "fastapi==0.115.0" "redis==5.0.7" "aioredis==2.0.1"

注意:aioredis==2.0.1 是最后一个兼容原生 redis-py 连接池的版本,再往后就得改 Redis 的初始化方式——别踩这个坑。

核心就两个类加一个中间件包装器。没有魔法,只有三处关键逻辑:令牌生成的异步定时刷新(用 启动后台协程)、滑动窗口的 清理、以及双策略失败时的 fallback 顺序(先桶后窗)。

注册中间件时传入默认参数,但真正起作用的还是 /admin/rate-limit/config 那个 POST 接口——它不只写 Redis,还顺手清掉 字典里对应的 key,不然新配置要等 30 秒 TTL 才生效。上线前我漏掉了 await redis.close(),结果压测时连接数暴涨到 65535,半夜被监控告警叫醒。

三组压测数据告诉你为什么不能二选一

代码写完了,配置接口调通了,Redis 里也塞满了 token bucket 的键值对——然后呢?

我拉了一台 4 核 8G 的测试机,跑了三组压测。不是为了证明“我比隔壁老王快”,是想看看这套组合拳在真实爬虫分发场景下到底扛不扛揍。

场景设定得很粗暴:模拟 100 个并发用户,每人连续怼 1000 个请求,总数 10 万。每个用户配一个独立 API Key,限流策略统一设为 200 QPS 令牌桶加滑动窗口兜底。跑完之后主要看两个指标:每个用户的请求成功率,以及延迟分布。

单独用令牌桶的结果先亮出来——前 5 秒很稳定,所有用户基本都能拿到令牌。但到第 6 秒开始,某个用户突然发了波突发流量(30 个请求挤在 200 毫秒内),令牌桶直接吐完存货,后续请求全被丢进等待队列。等它缓过来,其他用户的请求已经堆了 300 多毫秒的排队延迟。成功率的方差飙到 0.089,不公平的表现很明显。

滑动窗口的公平性确实漂亮,方差压到过 0.021。可代价呢?突发流量直接被削平了。同一个用户,想在 100 毫秒内怼 50 个请求过来——前 10 个正常放行,后 40 个被窗口计数器精准掐死。爬虫厂商的投诉电话,能把你手机打到没电。

组合方案跑完之后,方差直接降到 0.013。每个用户的请求成功率集中在 99.1% 到 99.4% 之间,没有人被饿死,也没有人能独吞带宽。突发流量进来时,令牌桶先扛第一波,允许短时脉冲通过;滑动窗口在桶空后接手,把超出阈值的请求拒在毫秒级窗口之外——不是粗暴拒绝,是返回 429 响应头带 Retry-After,客户端拿到之后自己做退避。整体 p99 延迟从单独令牌桶的 487ms 降到了 312ms。

看到这个数字,心里那块石头总算落地了。数字本身谈不上多漂亮,关键是它证明了一件事——把令牌桶和滑动窗口硬串在一起根本走不通,得让它们互相兜底,一个撑不住的时候另一个立刻顶上。

唯一想提醒你的是:测试时别用 localhost 跑 10 万请求,Redis 连接池会先崩。用 Unix socket,或者单独开一个 Redis 实例跑压测。


参考与延伸阅读

  • 使用 TokenBucket 与 FastAPI 构建高效 API 请求限流系统 — 至尊技术网(2025-12-02)
  • FastAPI 中间件实战:5 个自定义中间件提升你的 API 安全性与性能 — CSDN 博客(2026-03-06)
  • Python 应用如何实现基于 Redis 的滑动窗口限流逻辑 — PHP 中文网(2025-12-20)