构建基于CQRS与Puppeteer的弹性图数据ETL管道及死信队列实践


最初的需求听起来并不复杂:从十几个合作方的旧版门户网站上抓取供应链的上下游依赖数据,并构建一个可视化的依赖关系图。然而,现实很快给了我们沉重一击。这些门户网站的稳定性极差,网页结构频繁变更,时不时还会弹出无法预测的验证码。一个简单的定时任务加上Puppeteer爬虫的方案,在试运行的第一天就因为各种超时和DOM节点找不到的错误而彻底瘫痪。数据丢失、任务中断,整个过程一团糟。我们意识到,必须设计一个能从容应对失败的系统,而不是一个假定一切顺利的系统。

传统的单体脚本在这种场景下毫无还手之力。每一次Puppeteer执行的失败,都可能导致整个批处理任务的终止,已经成功处理的数据和失败的数据混杂在一起,难以追溯。我们最初的实现大致是这样的:

// initial-brittle-script.js
// 这是一个反面教材,展示了脆弱的设计
const puppeteer = require('puppeteer');
const neo4j = require('neo4j-driver');

const driver = neo4j.driver('neo4j://localhost', neo4j.auth.basic('neo4j', 'password'));

async function scrapeAndStore(url) {
    const browser = await puppeteer.launch();
    const page = await browser.newPage();
    const session = driver.session();
    try {
        await page.goto(url, { waitUntil: 'networkidle2' });
        // ... 极其脆弱的DOM选择器逻辑 ...
        const supplierName = await page.$eval('.supplier-name', el => el.innerText);
        const dependents = await page.$$eval('.dependent-list li', lis => lis.map(li => li.innerText));

        // 直接写入数据库,紧耦合
        for (const dependent of dependents) {
            await session.run(
                'MERGE (s:Supplier {name: $supplier}) MERGE (d:Dependent {name: $dependent}) MERGE (s)-[:SUPPLIES_TO]->(d)',
                { supplier: supplierName, dependent: dependent }
            );
        }
        console.log(`Successfully processed ${url}`);
    } catch (error) {
        // 错误发生,整个任务可能就此中断
        console.error(`Failed to process ${url}:`, error);
        // 这里没有重试,没有隔离,数据丢失
    } finally {
        await session.close();
        await browser.close();
    }
}

// 假设我们有一个URL列表
const urls = ['http://partner-a.com/data', 'http://partner-b.com/data'];
(async () => {
    for (const url of urls) {
        await scrapeAndStore(url);
    }
    await driver.close();
})();

这种设计的核心问题在于过程的紧耦合和对失败的零容忍。抓取、解析、入库这三个步骤被捆绑在一个单一的、同步的流程中。任何一步的失败都会污染整个执行链。我们需要的是解耦、异步化以及一种优雅的失败处理机制。

架构转向:拥抱CQRS与事件驱动

问题的根源在于我们将“发起抓取任务”这个意图(Command)和“查询关系图”这个需求(Query)混为一谈,并且用一个脆弱的过程将它们连接起来。CQRS(命令查询职责分离)模式给了我们一个清晰的思路:将系统的写操作(命令)和读操作(查询)彻底分开。

  • 命令 (Command): “为目标URL启动一次供应链数据分析”。这是一个高延迟、高失败率的操作,因为它涉及到网络IO和浏览器自动化。
  • 查询 (Query): “查询A供应商的所有下游企业”。这需要快速响应,数据模型应该是为查询优化的。

将两者分开后,我们可以为写模型(命令处理)设计一套强大的弹性机制,而为读模型(查询服务)选用最适合其数据结构的存储。这里,Neo4j图数据库显然是关系数据查询的最佳选择。

为了实现这种分离并引入弹性,事件驱动架构(EDA)和消息队列是天然的粘合剂。整个系统的流程被重新设计:

  1. 一个API接收到分析请求,发布一个AnalyzeSupplierWebsite命令到消息队列。
  2. 一个独立的命令处理器(消费者)从队列中获取该命令,并执行重量级的Puppeteer抓取任务。
  3. 任务成功后,处理器会发布一个或多个SupplierRelationshipDiscovered事件。
  4. 一个或多个事件处理器监听这些事件,并将数据更新到为查询优化的读模型中(即我们的Neo4j数据库)。
  5. 如果Puppeteer任务失败,命令处理器不会确认消息,而是将其拒绝,消息队列会根据预设的策略将其路由到**死信队列 (Dead Letter Queue, DLQ)**。

这种架构的健壮性显而易见。抓取任务的失败被隔离在命令处理器中,不会影响到其他任务的执行,更不会污染读模型。失败的任务被安全地存放在DLQ中,等待后续的人工介入或自动重试,实现了“零数据丢失”。

graph TD
    subgraph "写模型 (Command Side)"
        API -->|1. 发送AnalyzeSupplierWebsite命令| CommandBus(RabbitMQ Exchange)
        CommandBus -- Route --> CommandQueue(爬取任务队列)
        CommandHandler(命令处理器 - Puppeteer) -- Consume --> CommandQueue
        CommandHandler -- 3a. 成功, 发布事件 --> EventBus(RabbitMQ Exchange)
        CommandHandler -- 3b. 失败, NACK --> CommandQueue
    end

    subgraph "读模型 (Query Side)"
        EventBus -- Route --> EventQueue(关系发现事件队列)
        EventHandler(事件处理器) -- Consume --> EventQueue
        EventHandler -- 更新/写入 --> Neo4j[(Neo4j 读模型)]
        User -- 查询 --> Neo4j
    end

    subgraph "故障处理 (Failure Handling)"
        CommandQueue -- 消息被拒绝 --> DeadLetterExchange(死信交换机)
        DeadLetterExchange -- Route --> DLQ(死信队列)
        DLQ_Consumer(DLQ处理器) -- Consume --> DLQ
        DLQ_Consumer -- 人工分析/告警/重试 --> Ops
    end

    style CommandHandler fill:#f9f,stroke:#333,stroke-width:2px
    style DLQ fill:#f00,stroke:#333,stroke-width:2px

核心实现:构建弹性的消息处理管道

我们选择RabbitMQ作为消息中间件,因为它对DLQ和复杂的路由策略提供了出色的支持。

1. RabbitMQ队列与DLQ配置

正确配置队列是实现弹性的第一步。我们需要一个主工作队列和一个死信队列。当主队列中的消息因为某些原因(被消费者拒绝、TTL过期等)无法被处理时,它会被自动转发到指定的死信交换机,进而路由到死信队列。

// rabbitmq-setup.js
const amqplib = require('amqplib');

const RABBITMQ_URL = 'amqp://guest:guest@localhost:5672';

// 定义常量,避免魔法字符串
const exchanges = {
    COMMANDS: 'commands_exchange',
    EVENTS: 'events_exchange',
    DEAD_LETTER: 'dead_letter_exchange'
};

const queues = {
    ANALYZE_WEBSITE: 'q_analyze_website',
    RELATIONSHIP_DISCOVERED: 'q_relationship_discovered',
    DEAD_LETTER_QUEUE: 'q_dead_letter'
};

const routingKeys = {
    ANALYZE_WEBSITE: 'cmd.analyze.website',
    RELATIONSHIP_DISCOVERED: 'evt.relationship.discovered'
};

async function setup() {
    const conn = await amqplib.connect(RABBITMQ_URL);
    const channel = await conn.createChannel();

    // --- 创建交换机 ---
    await channel.assertExchange(exchanges.COMMANDS, 'topic', { durable: true });
    await channel.assertExchange(exchanges.EVENTS, 'topic', { durable: true });
    await channel.assertExchange(exchanges.DEAD_LETTER, 'topic', { durable: true });
    console.log('Exchanges asserted.');

    // --- 创建死信队列和绑定 ---
    await channel.assertQueue(queues.DEAD_LETTER_QUEUE, { durable: true });
    await channel.bindQueue(queues.DEAD_LETTER_QUEUE, exchanges.DEAD_LETTER, '#'); // 监听所有死信消息
    console.log('Dead Letter Queue setup complete.');

    // --- 创建主工作队列,并配置其死信策略 ---
    await channel.assertQueue(queues.ANALYZE_WEBSITE, {
        durable: true,
        arguments: {
            'x-dead-letter-exchange': exchanges.DEAD_LETTER,
            // 可选:指定死信的routing key,如果不指定则使用原消息的key
            'x-dead-letter-routing-key': 'dead.letter' 
        }
    });
    await channel.bindQueue(queues.ANALYZE_WEBSITE, exchanges.COMMANDS, routingKeys.ANALYZE_WEBSITE);
    console.log('Main command queue setup complete.');

    // --- 创建事件队列 ---
    await channel.assertQueue(queues.RELATIONSHIP_DISCOVERED, { durable: true });
    await channel.bindQueue(queues.RELATIONSHIP_DISCOVERED, exchanges.EVENTS, routingKeys.RELATIONSHIP_DISCOVERED);
    console.log('Event queue setup complete.');

    await channel.close();
    await conn.close();
}

setup().catch(console.error);

这段设置代码是整个系统的基石。关键在于q_analyze_website队列的arguments参数,它明确指定了当消息“死亡”时,应该被发送到dead_letter_exchange

2. 命令处理器:Puppeteer与失败处理

命令处理器是系统中唯一执行“脏活累活”的地方。它的设计必须把失败作为一等公民来考虑。

// command-handler.js
const puppeteer = require('puppeteer');
const amqplib = require('amqplib');
// ... 引入上面定义的常量 ...

class CommandHandler {
    constructor() {
        this.channel = null;
        this.browser = null;
    }

    async initialize() {
        const conn = await amqplib.connect(RABBITMQ_URL);
        this.channel = await conn.createChannel();
        this.browser = await puppeteer.launch({
            // 生产环境建议使用更健壮的配置
            headless: true, 
            args: ['--no-sandbox', '--disable-setuid-sandbox']
        });
        console.log('Command Handler initialized.');
    }

    async listen() {
        await this.channel.prefetch(1); // 一次只处理一个消息,防止压垮Puppeteer
        console.log('Waiting for commands...');
        this.channel.consume(queues.ANALYZE_WEBSITE, async (msg) => {
            if (msg === null) return;

            const command = JSON.parse(msg.content.toString());
            console.log(`Received command to analyze: ${command.url}`);
            
            let page = null;
            try {
                page = await this.browser.newPage();
                await page.setDefaultNavigationTimeout(60000); // 设置60秒超时
                
                await page.goto(command.url, { waitUntil: 'domcontentloaded' });
                
                // 模拟一个可能失败的抓取逻辑
                const supplierName = await page.$eval('.supplier-name', el => el.innerText.trim());
                if (!supplierName) throw new Error('Supplier name not found');

                const dependents = await page.$$eval('.dependent-list li', 
                    lis => lis.map(li => li.innerText.trim()).filter(Boolean)
                );
                
                // 成功,发布事件
                const event = {
                    type: 'SupplierRelationshipDiscovered',
                    payload: {
                        supplier: supplierName,
                        dependents: dependents,
                        sourceUrl: command.url,
                        timestamp: new Date().toISOString()
                    }
                };
                this.channel.publish(
                    exchanges.EVENTS,
                    routingKeys.RELATIONSHIP_DISCOVERED,
                    Buffer.from(JSON.stringify(event))
                );

                console.log(`Successfully processed ${command.url}, published event.`);
                // 显式确认消息,RabbitMQ会将其从队列中删除
                this.channel.ack(msg);

            } catch (error) {
                console.error(`[CRITICAL] Failed to process ${command.url}: ${error.message}`);
                // 核心:处理失败。
                // nack(msg, false, false) -> 拒绝消息,并且不重新排队。
                // 由于队列配置了DLQ,这条消息将被立即路由到死信队列。
                this.channel.nack(msg, false, false);
            } finally {
                if (page) await page.close();
            }
        });
    }
}

const handler = new CommandHandler();
handler.initialize().then(() => handler.listen()).catch(console.error);

这里的关键是try...catch块和channel.nack(msg, false, false)的调用。第一个false表示只拒绝当前这一条消息,第二个false表示不要将它重新放回原队列的队头。正是这个requeue=false的设置,触发了RabbitMQ的死信机制。

3. 事件处理器:幂等地更新Neo4j

事件处理器负责消费领域事件,并更新读模型。它的职责相对单一,但必须保证操作的幂等性。无论同一个SupplierRelationshipDiscovered事件被消费多少次,最终在图数据库中的状态都应该是一致的。Neo4j的MERGE命令天生就适合这个场景。

// event-handler.js
const neo4j = require('neo4j-driver');
const amqplib = require('amqplib');
// ... 引入常量 ...

class EventHandler {
    constructor() {
        this.channel = null;
        this.neo4jDriver = neo4j.driver('neo4j://localhost', neo4j.auth.basic('neo4j', 'password'));
    }

    async initialize() {
        const conn = await amqplib.connect(RABBITMQ_URL);
        this.channel = await conn.createChannel();
        console.log('Event Handler initialized.');
    }

    async listen() {
        console.log('Waiting for events...');
        this.channel.consume(queues.RELATIONSHIP_DISCOVERED, async (msg) => {
            if (msg === null) return;

            const event = JSON.parse(msg.content.toString());
            const { supplier, dependents, sourceUrl } = event.payload;
            console.log(`Received event for supplier: ${supplier}`);
            
            const session = this.neo4jDriver.session({ database: 'neo4j' });
            try {
                // 使用事务保证原子性
                const tx = session.beginTransaction();

                // MERGE确保了节点的不重复创建
                await tx.run(
                    `MERGE (s:Supplier {name: $name})
                     ON CREATE SET s.createdAt = timestamp(), s.sourceUrl = $sourceUrl
                     ON MATCH SET s.updatedAt = timestamp(), s.sourceUrl = $sourceUrl`,
                    { name: supplier, sourceUrl: sourceUrl }
                );

                for (const dependent of dependents) {
                    await tx.run(
                        `MERGE (d:Company {name: $name})`,
                        { name: dependent }
                    );
                    // MERGE也确保了关系的不重复创建
                    await tx.run(
                        `MATCH (s:Supplier {name: $supplierName})
                         MATCH (d:Company {name: $dependentName})
                         MERGE (s)-[r:SUPPLIES_TO]->(d)
                         ON CREATE SET r.discoveredAt = timestamp()`,
                        { supplierName: supplier, dependentName: dependent }
                    );
                }

                await tx.commit();
                console.log(`Graph updated for supplier: ${supplier}`);
                this.channel.ack(msg);

            } catch (error) {
                console.error(`Failed to update Neo4j: ${error.message}`);
                // 这里的失败处理策略可以更复杂,例如也发送到另一个DLQ
                // 为简化,我们暂时只记录错误并拒绝消息(可能导致重试)
                this.channel.nack(msg, false, true); 
            } finally {
                await session.close();
            }
        });
    }

    async close() {
        await this.neo4jDriver.close();
    }
}

const handler = new EventHandler();
handler.initialize().then(() => handler.listen()).catch(console.error);

通过MERGE,我们可以安全地重放事件流,而不用担心会产生重复的节点或关系,这是保证最终一致性的关键。

4. 死信队列处理器:分析与决策

DLQ中的消息是宝贵的财富,它们揭示了系统的弱点。一个简单的DLQ处理器至少应该能将这些失败的消息记录下来,供人工分析。

// dlq-consumer.js
const amqplib = require('amqplib');
// ... 引入常量 ...

async function consumeDeadLetters() {
    const conn = await amqplib.connect(RABBITMQ_URL);
    const channel = await conn.createChannel();
    console.log('DLQ Consumer is running...');

    channel.consume(queues.DEAD_LETTER_QUEUE, (msg) => {
        if (msg !== null) {
            const originalContent = msg.content.toString();
            const deathInfo = msg.properties.headers['x-death'][0];

            console.log('--- Dead Letter Received ---');
            console.log(`Reason: ${deathInfo.reason}`);
            console.log(`Original Exchange: ${deathInfo.exchange}`);
            console.log(`Original Routing Key: ${deathInfo['routing-keys'][0]}`);
            console.log(`Timestamp: ${new Date(deathInfo.time.low * 1000)}`);
            console.log('Original Message Body:', originalContent);
            console.log('----------------------------\n');

            // 在这里可以集成告警系统,如 PagerDuty, Slack 等
            // 或者将消息持久化到 Elasticsearch/MongoDB 便于搜索和分析
            
            // 确认消息已从DLQ中处理
            channel.ack(msg);
        }
    });
}

consumeDeadLetters().catch(console.error);

在真实的生产环境中,DLQ处理器会更加智能。例如,它可以检查x-death头中的count字段来判断消息已经失败了多少次。对于某些瞬时错误(如网络抖动),可以设计一个简单的延迟重试逻辑,将消息重新发布到主工作队列。对于那些因DOM结构改变而导致的永久性失败,则应该触发告警,通知开发人员更新爬虫逻辑。

局限性与未来展望

这套架构并非银弹。首先,Puppeteer的资源消耗巨大。如果需要抓取的URL规模达到成千上万,单个节点的命令处理器很快会成为瓶颈。水平扩展处理器是可行的,但这会带来浏览器实例管理的复杂性。一个更优的方案是引入类似Browserless.io这样的浏览器即服务(Browser-as-a-Service)平台,将浏览器生命周期管理外包出去。

其次,CQRS引入了最终一致性。从命令发出到Neo4j中数据可查,中间存在延迟。对于我们的供应链分析场景,分钟级的数据延迟是完全可以接受的,但对于需要实时反馈的系统,就需要仔细评估这种延迟带来的影响。

最后,当前的DLQ处理机制还比较初级。一个完善的系统应该建立一套自动化的重试和降级策略。例如,可以根据失败原因(如HTTP 503 vs 404)将死信消息路由到不同的队列,一个用于自动延迟重试,另一个则直接归档并告警。这能极大地减少人工干预的成本,让系统真正具备自愈能力。


  目录