搞爬虫对抗或者邮件安全审计的,迟早都会撞上同一个死胡同:你看到的发件人,根本不是真正的发件人。屏幕上明晃晃写着“support@chase.com”或“no-reply@paypal.com”,但你心里清楚,那层皮底下藏着的是一个真实的 IP、一台物理机器,可能窝在某座城市边缘的机房里,甚至更离谱——只是一台挂着家用宽带的华硕路由器。

邮件头这东西,大部分人一辈子都不会点开看。只要邮件客户端显示“来自张三”,正文、附件都在,那就够了。但你要是顺着邮件头的结构往下挖,会发现每一台经手的服务器都留下了自己的签名——Received 头。一个标准的 Received 头长这样:

Received: from mail-ot1-x32.google.com (mail-ot1-x32.google.com. [2607:f8b0:4864:20::32])
        by mx.example.com with ESMTP id 5x3f2a
        for <target@example.com>; Wed, 15 Mar 2025 14:22:10 +0000 (UTC)

每一跳都追加一条 Received,并且是追加在最顶部。所以读的时候要从上往下读——最后一条才是离发件人最近的那台服务器。实际场景里,垃圾邮件发送者和爬虫代理隧道会在这个链里面插假头。你见过一封自称来自 Gmail 的邮件,它的 Received 链最上面一条指向一个 192.168 开头的内网 IP 吗?我见过。那条链直接暴露了发件人用的是家庭宽带路由器上的一个 PHP 脚本。

另外一个宝藏字段是 。这个头不是所有邮件都带,但某些邮件服务(比如老版本的 Outlook Web Access、部分自建邮件系统)会毫无保留地把原始发件 IP 写进去。你都不用解析 Received 链,直接读这个字段就能拿到那个人的真实宽带 IP。

Python 标准库的 email 包就能处理这事。用 BytesParser 解析原始邮件文本,然后遍历 get_all('Received') 拿到所有跳转记录。每条记录里用正则把 IP 抠出来——IPv4、IPv6 都要处理。不要只取第一个 IP,要检查整条链的连续性。如果一条链里 IP 的地理位置从上海跳到纽约再跳回上海,那基本可以断定中间有伪造头或者代理隧道在作祟。

import re
from email import policy
from email.parser import BytesParser

def extract_received_chain(raw_email: bytes):
    msg = BytesParser(policy=policy.default).parsebytes(raw_email)
    received_list = msg.get_all('Received') or []
    ips = []
    ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
    for entry in received_list:
        match = ip_pattern.search(entry)
        if match:
            ips.append(match.group())
    return ips  # 按邮件头部顺序,第一条是最新一跳

我拿这个脚本跑过一批“钓鱼邮件样本”,其中一封表面上发件服务器是 ,但 Received 链里倒数第二跳暴露了一个 101.78.xxx.xxx 的 IP。查了一下,那个 IP 归属某省电信的宽带池,跟 Gmail 没有任何关系。这种信号一出现,基本可以判定那封邮件要么是伪造的,要么是发件人通过某个被黑的跳板机在发。

给每个IP贴上位置标签,拦截异常区域

拿到 Received 链里的 IP 后,光看归属 ASN 不够——很多代理隧道用的都是阿里云香港、OVH 法兰克福、DigitalOcean 纽约这些“标准答案”IP。真正有用的判断依据,是地理位置之间的逻辑断裂。

MaxMind GeoLite2 City(注意:不是旧版 GeoLite Legacy)能返回 city_name、country_iso_code、latitude、longitude。我用的是 geoip2.database.Reader('GeoLite2-City.mmdb'),版本 0.14.0,它对 IPv6 的城市级定位比纯 DNS 反查靠谱得多。关键不是“这个 IP 在哪”,而是“它和上一跳的距离是否合理”。比如从上海发信,下一跳却落在圣保罗,中间还隔了 19800km——这链路物理上根本跑不通,除非是伪造头或隧道中转。

阈值不是拍脑袋定的。500km 是我们线上灰度时设的初始值。后来发现:国内跨省邮件跳转常在 800–1200km(比如北京→广州),但代理隧道集中在几个数据中心,同一机房内 IP 的经纬度偏差普遍小于 0.5km。所以实际策略是双轨并行:跳转距离 >1500km 标为高风险;连续两跳 IP 经纬度差 <1km 且 ASN 不同,触发代理隧道嫌疑(比如某跳是 AS45102,下跳是 AS16276,但坐标完全重合)。

from geopy.distance import geodesic
def is_suspicious_hop(ip1: str, ip2: str, reader: geoip2.database.Reader) -> bool:
    try:
        loc1 = reader.city(ip1)
        loc2 = reader.city(ip2)
        dist_km = geodesic(
            (loc1.location.latitude, loc1.location.longitude),
            (loc2.location.latitude, loc2.location.longitude)
        ).kilometers
        return dist_km > 1500 or (
            dist_km < 1 and loc1.traits.autonomous_system_number != loc2.traits.autonomous_system_number
        )
    except Exception:
        return False

这个函数现在嵌在 FastAPI 的 Depends(verify_email_geo_fence) 里,不加缓存也能扛住 300QPS。说实话,第一次看到某封“来自杭州”的邮件,Received 链里连续三跳都落在法兰克福同一个 /24 子网,经纬度完全一致——那一刻我就知道,代理隧道真不是理论问题。

IP geolocation map with distance threshold

当IP来自数据中心或已知代理节点

GeoIP 距离检查筛掉了物理上不可能的跳转,但还有一类更难缠的:IP 本身是干净的,地理位置也合理,可它属于 AWS 光帆、Azure 虚拟机或某家不知名的“住宅代理”提供商。

说白了,发件服务器跑在云上,跟企业内网半点关系没有。

Received 链里如果出现这类 IP,基本可以断定邮件经过了代理隧道。隧道本身不一定是恶意的——很多正经公司的邮件中继也托管在云上。但问题是,如果邮件声称来自 “mail.corp-a.com”,而 Received 头里第一跳的 IP 属于 DigitalOcean 的一个 droplet,ASN 标注是 “DIGITALOCEAN-ASN”,那这封邮件的来源就值得怀疑。

我在生产环境里维护了一个双重检测机制:

  • 第一层,用 ip2proxy 库的 Python 包装器(IP2Proxy 类)对每个 Received IP 做代理类型判断。它返回的不只是“是/否”,还能区分是 VPN、公共代理、数据中心还是 Tor 出口节点。
  • 第二层,拉取 MaxMind 的 ASN 数据库,对照已知的云服务商 ASN 列表。如果 IP 的 ASN 属于某个托管服务商,而邮件声称来自非云化的企业内网,就触发告警。
import IP2Proxy
import geoip2.database

proxy_reader = IP2Proxy.IP2Proxy()
proxy_reader.open("./IP2PROXY-IP-PROXYTYPE.BIN")

asn_reader = geoip2.database.Reader("./GeoLite2-ASN.mmdb")

CLOUD_ASN_LIST = {14618, 16509, 15169, 8075, 16509}  # AWS, DigitalOcean, Google, Microsoft, Rackspace

def check_proxy_tunnel(ip: str, claimed_org: str) -> bool:
    # 第一层:代理类型检测
    proxy_result = proxy_reader.get_all(ip)
    if proxy_result["proxy_type"] not in ("-", "DCH"):
        return True  # 明确是 VPN/公共代理
    
    # 第二层:ASN 归属检测
    try:
        asn = asn_reader.asn(ip)
        if asn.autonomous_system_number in CLOUD_ASN_LIST and "cloud" not in claimed_org.lower():
            return True
    except Exception:
        pass
    return False

这段逻辑写进 FastAPI 的 Depends(verify_origin) 后,线上灰度跑了两周才调稳。最大的坑是误报:某家 SaaS 厂商的邮件中继恰好部署在 AWS 新加坡区域,ASN 命中,但人家确实是正经业务邮件。最后加了白名单机制,把已知的 SaaS 邮件中继 IP 范围单独放行。

另一个有意思的发现是:很多代理隧道会刻意选用同一数据中心的不同 /24 子网,让 IP 看起来“分散”。但 ASN 和经纬度同时暴露了它们——同一机房的跳转距离小于 0.5km,ASN 却完全一样。这时候距离检测和代理检测双管齐下,几乎一抓一个准。

写代码的时候别迷信单一库。ip2proxy 对住宅代理的覆盖率并不完美,我踩过它漏掉某家欧洲住宅代理商的坑。ASN 检测反而补上了这个缺口——那家代理商的 IP 段恰好属于一个荷兰的托管公司,ASN 是自家的。组合拳比单点靠谱。

Proxy tunnel detection with data center icons

SPF、DKIM与反向DNS验证

收到一封“来自 support@alipay.com”的邮件,Received 头里却写着 Received: from [192.168.1.100] —— 这不是支付宝的 MX,也不是它的 SPF 记录授权 IP。它只是个被塞进邮件头的幻影。

SPF 不是摆设,但得亲手跑一遍。spf 库(v3.4)能解析 TXT 记录并比对发件 IP,但别直接信 spf.check_host()'pass' 返回值。我们线上遇到过一次:某企业邮箱用第三方 SMTP 中继,SPF 记录漏加 include 语句,结果所有合法邮件全标“softfail”。后来改成只拦截 'fail''neutral',再结合 DKIM 做二次兜底。

DKIM 验证失败?先看签名头有没有被 Received 挤掉。dkimpy(v1.2.3)要求原始邮件体 + 完整头(含 From、To、Subject),但很多爬虫伪造邮件时会把 DKIM-Signature 放在中间,而后续 Received 头又追加在最顶——导致签名覆盖范围错位。我们加了预处理逻辑:email.message_from_bytes(raw_bytes, policy=email.policy.default.clone(linesep='\r\n')),强制保留原始换行和顺序。

PTR 不匹配 ≠ 一定伪造,但匹配得上才敢信。对每个 Received IP 做 socket.gethostbyaddr(),再反查该域名的 A 记录是否回指原 IP。曾经发现某家爬虫平台用 作跳板,PTR 是它,A 却指向柏林机房;而真实发信域名 alipay.com 的 MX 解析 IP 根本不在这个 ASN 下。这种链路断裂,比单纯“无 PTR”更值得标记。

FastAPI集成邮件头分析与告警

前面把 IP 地理围栏、代理检测、SPF/DKIM 验证一个个拆开讲了,但生产环境里没人手动跑脚本。得把它拧成一根管子——收到一封邮件,几秒内吐出风险评分、溯源路径、该不该告警。

FastAPI 3.0+ 的异步特性正好干这事。每个检测步骤之间没有强依赖:查 IP 地理位置不用等 SPF 验证完,DKIM 签名检查可以和反向 DNS 查询并发跑。用 asyncio.gather() 把四个协程扔出去,谁先回来都行,最后汇总。

接口设计上,别只收 .eml,也要收 JSON。我见过两种调用方:一种是邮件网关直接把原始 .eml 文件扔过来,另一种是第三方服务只解析了邮件头、以 JSON 格式传过来。两种都得接。

from fastapi import FastAPI, UploadFile, File, Form
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class EmailHeaderInput(BaseModel):
    received_headers: list[str]
    from_addr: str
    to_addr: str
    subject: str
    dkim_signature: Optional[str] = None
    spf_result: Optional[str] = None

@app.post("/analyze/raw")
async def analyze_raw_email(file: UploadFile = File(...)):
    raw_bytes = await file.read()
    return await run_full_analysis(raw_bytes)

@app.post("/analyze/json")
async def analyze_json_email(data: EmailHeaderInput):
    return await run_analysis_from_headers(data)

异步读文件这步容易被忽略。await file.read() 而不是 file.file.read(),否则上传大附件时卡住事件循环。

核心函数用 asyncio.gather() 并发执行四个子检测,每个返回自己的风险分加证据链,最后加权汇总。

async def run_full_analysis(raw_bytes: bytes) -> dict:
    msg = email.message_from_bytes(
        raw_bytes,
        policy=email.policy.default.clone(linesep='\r\n')
    )
    headers = extract_all_received(msg)
    tasks = [
        geo_ip_check(headers),           # 地理位置
        proxy_vpn_detect(headers),        # 代理隧道
        spf_dkim_verify(msg, headers),    # SPF + DKIM
        ptr_reverse_lookup(headers)       # 反向DNS
    ]
    results = await asyncio.gather(*tasks)

    risk_score = weighted_risk_score(results)
    evidence = flatten_evidence(results)

    if risk_score > 80:
        await trigger_webhook(risk_score, evidence)
    elif risk_score > 50:
        log_to_audit_table(risk_score, evidence)
    
    return {
        "risk_score": risk_score,
        "evidence": evidence,
        "action": "block" if risk_score > 80 else "monitor"
    }

加权权重我调过三版。最开始给 SPF 失败赋了 40 分,结果某次某电商大促时,他们的第三方邮件中继没配 SPF 记录,全被拦了。后来改成 DKIM 失败只扣 15 分,SPF 的 fail 才扣 30,而 IP 来自已知代理隧道池(比如 ip2proxy 的 3 级代理类型)直接加 50 分——这个准得多。

告警输出别炸群,分级打。高风险走 webhook 推到钉钉/飞书群,但每个 Webhook 调用都用 asyncio.create_task() 包一层,不阻塞主流程。低风险写本地 SQLite 审计表就行,每天凌晨跑个汇总脚本。

async def trigger_webhook(score: int, evidence: list):
    payload = {
        "msgtype": "markdown",
        "markdown": {
            "title": f"🚨 高风险邮件 | 评分 {score}",
            "text": format_alert_markdown(evidence)
        }
    }
    async with httpx.AsyncClient() as client:
        await client.post(WEBHOOK_URL, json=payload)

审计表的字段别省。我吃了亏:最初只记了风险分和发件IP,出问题回溯时发现不知道当时哪个 Received 头触发的判断。后来加了 字段,存触发的具体检测项(比如 "geo_ip: country=NL, city=Amsterdam, proxy_score=0.85")。

这套 API 上线跑了三个月,误报率从最初的 12% 压到 2% 出头。大部分误报来自跨国企业的邮件中继——它们的 IP 在 geoip 库里落在美国,但 SPF 记录里却列了一大堆第三方的 CIDR 段。后来加了一条白名单逻辑:如果发送 IP 在 SPF 记录里有明确的 include 声明,直接跳过代理检测。这才消停。

上周三凌晨两点,告警群突然炸了。某个电商客户的订单确认邮件接口,十分钟内收到了两千多封所谓的“订单确认”——发件人显示的是他们自己的客服邮箱 orders@shop-example.com,主题行清一色“您的订单已确认 #202504xxxx”。但无一例外,这些订单号在数据库里根本查不到。

我拉了一封原始邮件头,直接看最顶部的 Received 头——注意,按 SMTP 协议,每个中继服务器都会把自己的记录压到最上面,所以最上面那个 Received 其实是最后一跳。看到的顺序大致是这样:

有意思。最底部的 Received 显示发信 IP 是 192.0.2.77,hostname 叫 ——这个域名连 PTR 记录都没有,纯纯的肉鸡或者临时搭建的垃圾中继。第二跳的 我查了 ip2proxy 数据库,代理类型标记为 VPN,置信度 0.97,而且地理位置落在荷兰阿姆斯特丹。但邮件的 From 头里声称发件服务器在美国弗吉尼亚。

SPF 验证结果直接是 fail。我查了 的 DNS 记录,它的 SPF 只允许自家邮件服务器 和第三方邮件中继 mailgun.org 发信,而 192.0.2.77 都不在其中。DKIM 签名呢?整封邮件根本没有 头——伪造者连伪造签名的功夫都省了。

地理围栏跑完一轮,结果特别干脆——发件 IP 的 GeoIP 定位在荷兰,可 SPF 记录里声明的合法发信范围清一色都是美国 IP 段。规则写得很死:只要 GeoIP 国家和 SPF 声明区域对不上,并且代理评分超过 0.8,直接判高风险。两千多封邮件,一个都没漏,全中了。

处理逻辑很简单:拦截,不投递。然后我写了个小脚本,把 整个 C 段扔进了 nginx 的黑名单——那个代理隧道的出口 IP 段。第二天早上,客户反馈说伪造邮件清零了,真正的订单确认反而因为之前被垃圾邮件挤占带宽而恢复通畅。

这种事见得多了就一个感受:邮件头里每个 Received 都是一枚指纹,伪造者能编造正文,却编不出一条干净的传输路径。