爬虫一跑起来,数据就哗哗往数据库里灌——直到某天发现同一条商品信息存了 17 次,字段缺失的 JSON 占满日志,而 FastAPI 的 还在默默把校验逻辑塞进请求生命周期里。

同步爬虫卡在 requests.get() 上,大家早习惯了。但真把请求和写入拆开,问题才开始露头:HTTP 层只管发,清洗层只管存,中间那层「谁来决定这条数据要不要进库」彻底失焦。重复 ID 不是漏判,是根本没统一入口去查;字段缺失不是校验没写,是校验被扔进了 5 个不同 BackgroundTasks.add_task() 里各自执行。

查了掘金那篇 2026 年 4 月的 FastAPI 性能优化文(链接),也翻了 CSDN 上对比 和 Celery 的实测结论(链接)——原来不是任务该不该异步,而是「异步之后,状态在哪、幂等在哪、错误在哪」全得重想一遍。

选 Celery 不是因为社区喊得响

既然 BackgroundTasks 撑不住,那就得上真家伙。选 Celery 不是因为社区喊得响,而是它的 ack_late 和重试机制刚好能解决「数据丢了谁来管」这个核心问题。架构图其实特简单:FastAPI 只负责收数据、扔给 Redis,Celery Worker 从另一头捞出来慢慢洗。

爬虫那边我砍掉了所有校验逻辑。HTTP 响应拿到手,直接拼成原始 JSON 丢进队列。连字段类型都对不齐的脏数据也照收不误——反正清洗层才是正主。delay() 调用时顺手打上爬虫源 ID 和时间戳,方便消费端做去重判断。这一步要是加校验,就又把耦合捡回来了,白拆。

Worker 从 Redis 里 pop 出任务,走四步:字段格式规整、去重查库、业务校验、写入目标表。每一步都有可能抛出异常,但 Celery 的 max_retries 加指数退避能把临时网络抖动吞掉。核心函数签名大概长这样:

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def clean_and_store(self, raw_data: dict) -> dict:
    dedup_key = f'dedup:{raw_data["source"]}:{raw_data["id"]}'
    if redis_client.exists(dedup_key):
        return {'skipped': True, 'reason': 'duplicate'}
    # 校验、转换、写库...
    redis_client.setex(dedup_key, 86400, '1')
    return {'stored': True}

Redis 那把 TTL 一天的分布式锁,算是去重最轻量的实现。压测时发现 QPS 冲到 800 以上会导致重复概率升高,后来加了 bloomfilter 做预筛才压住。

生产者消费者拆开之后,爽是真爽——爬虫崩了不影响清洗,Worker 扩容直接加实例。但坑也一个没少:消息重复消费、死信队列没人管、Worker OOM 把任务全丢了。Celery 的 得配好,不然任务状态全靠猜。后来我把结果写进 MongoDB,任务失败能直接从 Web 面板点重跑,省了不少盯日志的夜。

说到底,队列不是银弹。它只是把「耦合」换了个地方,从代码层挪到了基础设施层。但比起在 FastAPI 进程里硬扛,这步棋起码让清洗逻辑有了独立演进的资格。

producer consumer pattern with FastAPI and Celery

让 FastAPI 和 Celery 好好握手

把生产者和消费者拆开,听着挺好,真落地的时候第一个坑就是:FastAPI 接到的 raw_data 到底怎么丢进队列?用 BackgroundTasks 只能凑合,进程一挂任务就丢,没法重试,更别谈跨机器跑。得用 Celery,真正持久化、可重试、能扔到另一台机器上干活的方案。

Celery 6.3.0+ 默认要求 Redis 6.2+ 或 RabbitMQ 3.8+;我们用 redis-py 4.6.0 + redis-server 7.0.15 搭的最小闭环。别信文档里那句「推荐 RabbitMQ」——本地开发和中小规模清洗管道,Redis 的 pub/sub + list 做 broker 更轻。配置就两行:

CELERY_BROKER_URL = "redis://localhost:6379/1"
CELERY_RESULT_BACKEND = "redis://localhost:6379/2"

注意:result backend 必须和 broker 分库,否则任务状态会互相污染。

@app.task 直接塞进 FastAPI 的路由文件?Celery Worker 启动时 import 失败当场报错。得单独建 ,并在 中显式注册:

from celery import Celery
app = Celery("crawler_pipeline")
app.config_from_object("celeryconfig")  # 不要硬编码
app.autodiscover_tasks(["tasks.cleaner"])  # 扫描路径必须含包名

clean_and_store.delay(raw_data) 看着省事,但没法设 countdown、expires 或 routing_key。实际清洗场景里,你得给高优爬虫源加优先级队列:

clean_and_store.apply_async(
    args=[raw_data],
    queue="high_priority",
    expires=300,  # 5分钟过期,防脏数据卡死
    retry_kwargs={"max_retries": 2}
)

这行代码跑完,FastAPI 立刻返回 202,不等 Worker 开始执行。真正的解耦,从这里才算开始呼吸。

Celery task deduplication with Redis set

去重那点事,指纹说了算

任务队列跑起来了,FastAPI 返回 202 的速度让人舒坦。但爬虫数据进了队列,不等于万事大吉。

同一个商品页,爬虫在不同时段抓了两遍,两条一模一样的数据先后落进队列。清洗任务处理第一条时一切正常,第二条再来——字段校验通过了,记录库却报主键冲突。抑或更糟:没设唯一索引,记录库里躺着两条完全相同的记录,下游报表直接翻倍。

去重必须在任务消费侧做,而且得做到内存里,不能每次都去数据库查一遍。Redis 的 Set 结构天然适合这个场景:每条数据算出一个指纹,用 SADD 塞进集合,返回 0 就说明已经存在。

最简单的指纹是 URL 去重——爬虫常用。但清洗管道里,数据经过了解析和转换,同一 URL 可能产出不同的清洗结果。我习惯用 hashlib 把关键字段拼接后做摘要:

import hashlib, json

def make_fingerprint(record: dict) -> str:
    # 只取业务唯一性相关的字段,排除时间戳、随机ID
    keys = ["source_url", "product_id", "price", "spec_hash"]
    payload = "|".join(str(record.get(k, "")) for k in keys)
    return hashlib.sha256(payload.encode()).hexdigest()

注意:不要把 crawled_at 这类时间字段放进去,否则同一页面每次抓取指纹都不同,去重形同虚设。

去重逻辑塞在 任务的入口处,指纹检查不通过就直接 return,连数据库连接都不开——省资源:

from redis import Redis
from celery import shared_task

redis_client = Redis.from_url("redis://localhost:6379/3")

@shared_task(bind=True, max_retries=2)
def clean_and_store(self, raw_data: dict):
    fp = make_fingerprint(raw_data)
    # SADD 返回 0 说明已存在
    if not redis_client.sadd("processed_fingerprints", fp):
        return {"status": "duplicated", "fingerprint": fp}
    # 后续清洗、写入逻辑…

SADD 是原子操作,不用先查再写,不会出现并发重复。

爬虫跑一个月,指纹集合几百 GB,内存扛不住。得给指纹设 TTL——但 Redis Set 不能直接对单个成员设过期。我用了两种方案混搭。

方案一:用 EXPIRE 对整个 key 设过期,适合周期性全量爬虫。每次新周期开始,换个 key 名(比如 ),旧 key 自然过期:

key = f"processed_fingerprints:{date.today().isoformat()}"
redis_client.sadd(key, fp)
redis_client.expire(key, 86400 * 3)  # 保留 3 天

方案二:用 Sorted Set 替代 Set,把时间戳当 score,取指纹时只查最近 N 小时的数据。定时任务清理低分成员。这招适合增量爬虫,数据流动不按天切分。

别图省事把 TTL 设成永久。我见过有人设了 expire=0,三天后 Redis 内存暴涨,OOM kill 直接把 Celery 连坐了。

字段校验如果放在去重之后,一条脏数据被校验拒绝、没写入 DB,但指纹已经塞进了 Redis。下次相同数据再来,校验逻辑还没跑就被跳过了——永远丢失。正确顺序是:先校验,通过后生成指纹再去重。

def clean_and_store(self, raw_data):
    validated = validate_schema(raw_data)  # 先校验
    if not validated:
        return {"status": "invalid"}
    fp = make_fingerprint(validated)
    if not redis_client.sadd(key, fp):
        return {"status": "duplicated"}
    # 写入数据库…

校验通过才算“管用数据”,指纹只对有效数据做去重。脏数据放进来也没用,反而填塞集合。Redis 去重不是什么高深技巧,但指纹策略和过期方案一旦选错,回头改数据清洗逻辑的代价比写代码时多十倍。

Pydantic 帮你拦住八成脏数据

去重搞定了,但脏数据进来的问题还在。爬虫抓到的字段经常缺胳膊少腿——价格字段返回了"价格待询"这种字符串,发布时间是"刚刚"要么"三天前",以至于直接把 HTML 标签带进来。这些东西写进数据库就是灾难,下游报表直接炸。

我一开始用了一堆 if-else 做校验,大概写了四五十行,后来加了几个新字段,if-else 直接膨胀到没法看。换 Pydantic 模型吧,FastAPI 原生就带的东西,不用白不用。

一个商品数据最终入库的结构长这样:

from pydantic import BaseModel, Field, validator
from datetime import datetime
from typing import Optional

class CleanedProduct(BaseModel):
    product_id: str = Field(..., alias="id")
    title: str = Field(..., min_length=1, max_length=200)
    price: float = Field(..., ge=0)
    publish_time: datetime
    source_url: str = Field(..., alias="url")
    category: Optional[str] = None
    raw_text: Optional[str] = None

    @validator("price", pre=True)
    def parse_price(cls, v):
        if isinstance(v, str):
            # 去掉"¥"、"元"、"价格面议"这类东西
            cleaned = v.replace("¥", "").replace("元", "").strip()
            if cleaned in ("价格面议", "待询", ""):
                return 0.0
            return float(cleaned)
        return v

    @validator("publish_time", pre=True)
    def parse_time(cls, v):
        if isinstance(v, str):
            # "刚刚"、"3分钟前"这类相对时间转成绝对时间
            if v == "刚刚":
                return datetime.now()
            if "分钟前" in v:
                minutes = int(v.replace("分钟前", ""))
                return datetime.now() - timedelta(minutes=minutes)
            # 其他格式用 dateutil 兜底
            return parser.parse(v)
        return v

Fieldalias 映射原始字段名,validator 做类型转换。price 字段必须浮点数且大于等于 0,publish_time 必须转成 datetime。脏数据在构造时就会抛 ,根本进不到下一步。

去重之前先校验,这个顺序我踩过坑,上一章结尾已经说了。实际代码长这样:

from pydantic import ValidationError

@app.task(bind=True, max_retries=3)
def process_raw_product(self, raw_data: dict):
    try:
        product = CleanedProduct(**raw_data)
    except ValidationError as e:
        # 校验失败,丢进死信队列
        dead_letter_queue.send(
            reason="validation_failed",
            raw_data=raw_data,
            errors=e.errors()
        )
        return {"status": "invalid", "errors": e.errors()}

    fp = make_fingerprint(product)
    if not redis_client.sadd(f"fingerprints:{date.today()}", fp):
        return {"status": "duplicated"}

    db.insert(product.dict(by_alias=True))
    return {"status": "stored"}

校验没通过的数据不会生成指纹,也不会挤进去重集合,直接丢到死信队列里。这个死信队列我单独用了一个 Redis List 来装,每天定时跑一次脚本,人工翻一翻,看看到底是爬虫解析写岔了,还是源站偷偷改了字段格式。

遇到过最离谱的情况:某个电商把价格字段从 price 改成了 salePrice,模型校验全部失败,一天积累了三千多条死信。要不是死信队列兜着,那些数据就悄无声息丢了,业务方还以为是没爬到。

校验模型的另一个好处:字段别名统一。原始 JSON 里字段名可能是驼峰、下划线、甚至拼音缩写,通过 alias 全转成 Python 风格,写入数据库时再用 by_alias=True 转回去。不用在代码里到处写 raw_data.get("product_name", raw_data.get("productName", "")) 这种破东西。

最后说一句:Pydantic v2 的校验速度比 v1 快了好几倍,能升就升。别用 v1 的 @validator 装饰器写太多复杂逻辑,v2 的 @field_validator 更清晰,但核心思路一样——让模型自己管好自己的字段,别让业务代码替它操心。

性能优化那点破事

上线第三天凌晨,Celery Worker 的 CPU 突然卡在 98%,但日志里没报错——只有一堆 RECEIVED 没后续。查了半小时才发现:默认的 worker_concurrency=1 被我漏改了,4 核机器硬是跑成单线程串行处理。

我们把 celery -A app.celery_app worker --concurrency=8 写进 systemd service,又加了 --max-tasks-per-child=1000 防内存泄漏。实测从每秒吞 3 条涨到 27 条,不是线性提升——瓶颈很快移到数据库写入上。

Flower 启起来后,flower -A app.celery_app --port=5555,一眼就能看到哪个任务卡在 RECEIVED 状态、哪台 worker 连不上 Redis Broker。有次发现所有失败任务都卡在 db.insert(),顺藤摸瓜查出 SQLAlchemy 连接池没配 pool_pre_ping=True,连接超时后没自动重连。

把单条 db.insert(product.dict(by_alias=True)) 改成每 50 条一批:db.execute(insert(ProductModel).values(batch))。PostgreSQL 的 INSERT ... VALUES (...), (...), (...) 批量语法,让写入耗时从平均 120ms 降到 18ms。连接数也稳住了——再也不用半夜被 Prometheus 告警叫醒。

填完这些坑回头一看,真没什么玄乎的技术,全是默认值设的陷阱。可每解决一个,管道就硬气一分。