监控告警在凌晨三点把我从床上震起来。48 个爬虫节点全部失联,最后一条日志停在两点四十七分,队列里压着六十万条 pending 任务。重启之后更糟:已爬数据在去重集合里找不到痕迹,所有节点从头开始抓。运维同事跑过来说“昨晚爬过的商品又写了一遍数据库”——传统 Scrapy-Redis 在节点崩溃后根本没法精确恢复任务状态。那周光换代理就花了五千块。

半夜两点,爬虫集群又挂了

说白了一句话:Scrapy-Redis 那套基于 RPOPLPUSH 的队列机制,一旦 worker 意外断开,它刚 pop 出来但还没处理完的任务就永久丢失了。你没法知道哪个 URL 正在被哪个节点处理,也分不清“任务超时”和“任务已完成”的边界。去重用的 Redis Set,节点重启后 Set 里记录还在,但队列里那些任务已经没了——结果就是它认为“没爬过”的 URL 又被重新入队。

所以真正要解决的不是怎么把任务分给多个节点——那是调度问题——而是怎么让每个任务在节点挂掉之后,能被人认领、能被人接着干。就好比快递员送件送到一半车坏了,别的快递员得知道他从哪个站点开始、哪些件已经签收、哪些还卡在车厢里。

我们要的是一个能持久化任务状态、能精确断点续爬的队列系统。Redis Streams 的消费者组机制刚好干这事:每条消息有唯一 ID,消费时能手动确认(XACK),没确认的消息在 PENDING 列表里挂着你随时能查。节点重启后调用 XPENDING 看一眼哪些任务还没 ACK,直接重新分配就行,完全不用等全量去重。

配合 FastAPI 写个轻量调度层,把爬虫节点注册成 HTTP 服务,启动时拉一次待恢复任务列表,而不是每次重启都从零 push 全部 URL——这才是能扛住大规模采集的正经思路。

Redis Streams consumer group diagram

为什么是 Streams,不是 List 或 Pub/Sub

前文说 Scrapy-Redis 的 RPOPLPUSH 机制在节点挂掉时会丢任务——那有人问:直接用 Redis List 的 LPUSH + BRPOP 不行吗?或者上 Pub/Sub 广播任务?真,这两个方案在断点续爬场景里,连“能跑”都勉强。

List 队列:pop 出来就等于“已消费”

BRPOP 拿到任务的瞬间,它就从队列里消失了。worker 进程如果在解析 HTML 时被 kill -9、OOM Killer 杀掉、或者 Docker 容器被强制 stop——这条任务彻底蒸发。没有 ID、没有重试标记、没有 pending 列表可查。你甚至没法区分它是“根本没开始处理”,还是“刚写完一半数据库就崩了”。

Pub/Sub:消息不是快递,是烟花

发布即销毁。哪怕你用 PSUBSCRIBE 监听所有频道,只要 consumer 进程断开 100ms,那条任务就永远沉底。Redis 官方文档白纸黑字写着:Pub/Sub 不提供消息持久化,消费者断开后消息永久丢失。它压根不设计用来做任务队列——没持久化、没 ACK、没消费者组、没 XPENDING。拿它做爬虫调度,等于让快递员骑自行车送件,中途不记单号、不签收、不备份运单。

Streams 的消费者组:每条消息自带身份证

XRANGE 查历史,XREADGROUP 拉新任务,XACK 确认完成,XPENDING 查未确认——四条命令闭环。一条消息 ID 是 ,谁消费了、卡在哪、超时多久,全在 Redis 里存着。节点重启后,XREADGROUP GROUP crawler-group worker-02 STREAMS mystream > COUNT 10 就能拉到待恢复任务;XPENDING mystream crawler-group - + 10 能立刻看到哪些 URL 卡在 worker-01 的内存里没 ACK。这才是能落地的断点续爬基座。

FastAPI task scheduler with Redis Streams

用 FastAPI 搭一个分片调度器

上一章聊完 Streams 的消费者组能扛节点崩溃,那下一个问题就是:任务怎么喂进去,还喂得聪明。

你不能所有 URL 都往同一个 Stream 里塞——那跟把全城快递都堆在一个分拣口没区别。worker-01 抢到一条京东商品页,worker-02 抢到一条淘宝详情页,两个节点各自开 HTTP 连接、各自维护代理池、各自解析不同站点模板。没有任何局部性。缓存命中率低、连接池复用不了、反爬策略也打散在各节点里难以统一调整。

分片。分片的核心逻辑就一句话:相似的任务走同一条 Stream。我选域名哈希做第一层分片——hash(domain) % SHARD_COUNT,确保同一个站点的所有 URL 流到同一个消费者组。这样 worker-01 如果被分配了 jd.com 的分片,它就可以跟那个站点的反爬系统“死磕到底”:本地缓存 Cookie、预热 IP 池、保持长连接,甚至专门开一个解析器装京东的商品页面模板。

from fastapi import FastAPI, HTTPException
from redis.asyncio import Redis
import hashlib

app = FastAPI()
redis = Redis.from_url("redis://localhost:6379")
SHARDS = 8
STREAM_PREFIX = "crawl_stream:"

def shard_key(url: str) -> str:
    from urllib.parse import urlparse
    domain = urlparse(url).hostname or "unknown"
    idx = int(hashlib.md5(domain.encode()).hexdigest(), 16) % SHARDS
    return f"{STREAM_PREFIX}{idx}"

@app.post("/submit")
async def submit_task(url: str, priority: int = 1):
    stream = shard_key(url)
    msg_id = await redis.xadd(
        stream,
        {"url": url, "priority": priority, "retry": 0},
        maxlen=100000
    )
    return {"stream": stream, "msg_id": msg_id}

这代码一看就懂。但真正落地时有个坑:maxlen 设多大?我刚开始设了 50000,结果爬某电商大促期间——双十一那会儿——单节点一小时产出 3 万条任务,一个分片队列半天就满了,尾部消息被自动裁剪。XPENDING 查 pending 列表时发现最早的任务 ID 已经没了。后来改成 ~100000,加波浪号是让 Redis 用惰性裁剪,别每写一次就清一次。

另一个细节:优先级怎么塞进 Streams?Streams 本身不支持优先级队列。我这边做法很简单——按优先级分流:高优先级 URL 写 ,普通任务写 。worker 节点先 XREADGROUP 从高优分片读,读到空再切到低优。不需要 ZSET,不需要额外排序。

消费者组注册这部分,FastAPI 的启动事件里做一次就够了。

@app.on_event("startup")
async def init_consumer_groups():
    for shard in range(SHARDS):
        for priority in ("high", "low"):
            stream = f"crawl_stream:{priority}:{shard}"
            try:
                await redis.xgroup_create(stream, "crawler_group", id="0", mkstream=True)
            except Exception as e:
                # 组已存在,跳过
                pass

为什么用 id="0"?因为消费者组第一次创建时,如果指定 $ 就只接收创建之后的新消息——那之前积压的未完成任务就全漏了。设成 0 表示从 Stream 头部开始消费,配合 XPENDING 恢复机制,节点重启后不会丢任何一条。

调度层到这里基本成型。客户端 POST 一个 URL 过来,FastAPI 算它的域名哈希,选一个分片队列,XADD 进去。worker 节点用 XREADGROUP 从自己负责的分片里拉任务,处理完 XACK,挂掉之后 XPENDING 捞回来重试。没有中间件,没有消息代理,没有 ZK 选主——就一个 Redis 实例 + 不到 50 行 Python。

说实话,这套方案我跑了三周之后才敢放线上。最担心的不是 Streams 性能,而是 Redis 单机扛不扛得住每秒几千次 XADD + XREADGROUP。实测下来:32 核的 Redis 7.2,8 个分片,每个分片维持 5 万 pending 消息,QPS 稳定在 18000 左右,CPU 70%。瓶颈反而不是 Redis,是 worker 节点的 aiohttp 连接池。

单点 Redis 一旦挂了,整个爬虫集群直接瘫痪——别担心,这个问题下一节专门拆开来聊。

断点续爬的脊椎骨:Pending 列表

上一章说完了分片注册和 XREADGROUP 的拉取逻辑,但真正让“挂掉不丢任务”这件事落地的,是 Redis Streams 里那个藏得有点深的 Pending 列表——不是靠文档读出来的,是某次 worker 进程被 OOM kill 后,盯着 XPENDING crawl_stream:high:3 crawler_group - + 10 返回的 17 条未 ACK 记录,才意识到它才是断点续爬的脊椎骨。

ACK 不是终点,是起点

worker 处理完一个 URL,调用 redis.xack("crawl_stream:high:3", "crawler_group", message_id)。这行代码干了两件事:把消息从 Pending 列表里删掉;同时告诉 Redis “这个节点已交付”。但关键在前半句——只要没 XACK,这条消息就永远钉在 Pending 列表里,带着时间戳、所属消费者名、重试次数,纹丝不动。

崩溃不是终点,是交接仪式

节点 A 挂了?没关系。节点 B 启动时,先跑一遍 XPENDING 扫描所有分片的 pending 数量。发现 里有 42 条卡了 120 秒以上,立刻用 XCLAIM 把它们“认领”过来:redis.xclaim("crawl_stream:low:1", "crawler_group", "node_b", 120000, ["169876543210-0"])。注意第三个参数必须是新消费者名,否则 Redis 会拒绝——这是强制你显式声明“我接管了”,不是偷偷摸摸抢活。

监控不是可选,是心跳

我们用一个独立的 FastAPI 后台任务,每 30 秒轮询所有分片的 XPENDING ... - + 100。一旦某分片 pending 超过 500 条且平均延迟 > 180s,就触发告警并自动扩容 worker 实例。别信“等它自己恢复”——pending 不会自我消化,只会越积越厚,直到内存告急。

这套机制跑通之后,我才敢把 max_retries=3 从代码里删掉。因为重试逻辑已经下沉到 Redis 层,Python 层只管处理,不操心失败。

实战:kill -9 之后,另一个节点如何无缝接手

光说不练假把式。我们把节点 B 的进程 kill -9 干掉,看看节点 A 能不能真的把活接过来,而且不丢一条 URL。

Kill 之后,Pending 列表里有什么

节点 B 正在处理的 12 条消息,瞬间变成“孤儿”。运行 XPENDING crawl_stream:high:1 crawler_group - + 20,返回的每一行都带着 node_b 的名字和 这样的时间戳。关键字段是 elapsed——从投递到现在的毫秒数。只要这个值超过你设定的超时阈值(比如 120 秒),别的节点就有权接管。

XCLAIM 不是抢,是捡

节点 A 的监控定时器每 15 秒跑一次 XPENDING,发现 node_b 的 pending 里有 8 条已经超时 180 秒。直接调用:

redis.xclaim(
    "crawl_stream:high:1", 
    "crawler_group", 
    "node_a", 
    120000, 
    ["1698876543000-0", "1698876543001-0"]
)

120000 是最小空闲时间(毫秒),只有 pending 超过这个值的消息才会被认领。第三个参数 node_a 是新的消费者名——Redis 要求你明确说“我是谁”,不准匿名捡尸。XCLAIM 返回的消息 ID 列表,就是节点 A 接下来要处理的。它拿到后直接调 process_url(),不需要额外去重,因为 Redis 已经把消息从 node_b 的 pending 里删掉,挂到了 node_a 名下。

零重复的秘密:幂等消费

但有个坑:万一节点 B 已经处理完这条 URL,只是还没来得及 XACK 就挂了?节点 A 会重新爬一遍。解决办法不在 Redis 层,在业务层——每条 URL 的响应数据写入数据库时用 (MySQL)或 setnx(Redis)。同一个 URL 爬两次,第二次只会覆盖响应时间,不会产生重复记录。这才是真正的“零丢失、零重复”。不是靠 Redis 单方面承诺,而是消费端做了最后一道防线。

别让 pending 烂在那里

跑起来之后,记得加上 XLEN 看看每个消费者的 pending 数量。如果某个节点长期不处理又不消失,pending 会垒到几千条,拖慢整个组。我们线上设置了一个告警阈值:任何分片 pending > 200 且平均延迟 > 300s,直接钉钉通知人工介入。这步不做,Redis 内存迟早爆炸,并且排查起来比 OOM 本身还烦。

做完这个实验,我才敢把生产环境的爬虫从单机切过来——节点崩了不是末日,交接仪式写清楚就行。剩下的,全交给 Redis Streams 那个总被忽略的 Pending 列表。

参考与延伸阅读