最初的需求听起来并不复杂:从十几个合作方的旧版门户网站上抓取供应链的上下游依赖数据,并构建一个可视化的依赖关系图。然而,现实很快给了我们沉重一击。这些门户网站的稳定性极差,网页结构频繁变更,时不时还会弹出无法预测的验证码。一个简单的定时任务加上Puppeteer爬虫的方案,在试运行的第一天就因为各种超时和DOM节点找不到的错误而彻底瘫痪。数据丢失、任务中断,整个过程一团糟。我们意识到,必须设计一个能从容应对失败的系统,而不是一个假定一切顺利的系统。
传统的单体脚本在这种场景下毫无还手之力。每一次Puppeteer执行的失败,都可能导致整个批处理任务的终止,已经成功处理的数据和失败的数据混杂在一起,难以追溯。我们最初的实现大致是这样的:
// initial-brittle-script.js
// 这是一个反面教材,展示了脆弱的设计
const puppeteer = require('puppeteer');
const neo4j = require('neo4j-driver');
const driver = neo4j.driver('neo4j://localhost', neo4j.auth.basic('neo4j', 'password'));
async function scrapeAndStore(url) {
const browser = await puppeteer.launch();
const page = await browser.newPage();
const session = driver.session();
try {
await page.goto(url, { waitUntil: 'networkidle2' });
// ... 极其脆弱的DOM选择器逻辑 ...
const supplierName = await page.$eval('.supplier-name', el => el.innerText);
const dependents = await page.$$eval('.dependent-list li', lis => lis.map(li => li.innerText));
// 直接写入数据库,紧耦合
for (const dependent of dependents) {
await session.run(
'MERGE (s:Supplier {name: $supplier}) MERGE (d:Dependent {name: $dependent}) MERGE (s)-[:SUPPLIES_TO]->(d)',
{ supplier: supplierName, dependent: dependent }
);
}
console.log(`Successfully processed ${url}`);
} catch (error) {
// 错误发生,整个任务可能就此中断
console.error(`Failed to process ${url}:`, error);
// 这里没有重试,没有隔离,数据丢失
} finally {
await session.close();
await browser.close();
}
}
// 假设我们有一个URL列表
const urls = ['http://partner-a.com/data', 'http://partner-b.com/data'];
(async () => {
for (const url of urls) {
await scrapeAndStore(url);
}
await driver.close();
})();
这种设计的核心问题在于过程的紧耦合和对失败的零容忍。抓取、解析、入库这三个步骤被捆绑在一个单一的、同步的流程中。任何一步的失败都会污染整个执行链。我们需要的是解耦、异步化以及一种优雅的失败处理机制。
架构转向:拥抱CQRS与事件驱动
问题的根源在于我们将“发起抓取任务”这个意图(Command)和“查询关系图”这个需求(Query)混为一谈,并且用一个脆弱的过程将它们连接起来。CQRS(命令查询职责分离)模式给了我们一个清晰的思路:将系统的写操作(命令)和读操作(查询)彻底分开。
- 命令 (Command): “为目标URL启动一次供应链数据分析”。这是一个高延迟、高失败率的操作,因为它涉及到网络IO和浏览器自动化。
- 查询 (Query): “查询A供应商的所有下游企业”。这需要快速响应,数据模型应该是为查询优化的。
将两者分开后,我们可以为写模型(命令处理)设计一套强大的弹性机制,而为读模型(查询服务)选用最适合其数据结构的存储。这里,Neo4j图数据库显然是关系数据查询的最佳选择。
为了实现这种分离并引入弹性,事件驱动架构(EDA)和消息队列是天然的粘合剂。整个系统的流程被重新设计:
- 一个API接收到分析请求,发布一个
AnalyzeSupplierWebsite命令到消息队列。 - 一个独立的命令处理器(消费者)从队列中获取该命令,并执行重量级的Puppeteer抓取任务。
- 任务成功后,处理器会发布一个或多个
SupplierRelationshipDiscovered事件。 - 一个或多个事件处理器监听这些事件,并将数据更新到为查询优化的读模型中(即我们的Neo4j数据库)。
- 如果Puppeteer任务失败,命令处理器不会确认消息,而是将其拒绝,消息队列会根据预设的策略将其路由到**死信队列 (Dead Letter Queue, DLQ)**。
这种架构的健壮性显而易见。抓取任务的失败被隔离在命令处理器中,不会影响到其他任务的执行,更不会污染读模型。失败的任务被安全地存放在DLQ中,等待后续的人工介入或自动重试,实现了“零数据丢失”。
graph TD
subgraph "写模型 (Command Side)"
API -->|1. 发送AnalyzeSupplierWebsite命令| CommandBus(RabbitMQ Exchange)
CommandBus -- Route --> CommandQueue(爬取任务队列)
CommandHandler(命令处理器 - Puppeteer) -- Consume --> CommandQueue
CommandHandler -- 3a. 成功, 发布事件 --> EventBus(RabbitMQ Exchange)
CommandHandler -- 3b. 失败, NACK --> CommandQueue
end
subgraph "读模型 (Query Side)"
EventBus -- Route --> EventQueue(关系发现事件队列)
EventHandler(事件处理器) -- Consume --> EventQueue
EventHandler -- 更新/写入 --> Neo4j[(Neo4j 读模型)]
User -- 查询 --> Neo4j
end
subgraph "故障处理 (Failure Handling)"
CommandQueue -- 消息被拒绝 --> DeadLetterExchange(死信交换机)
DeadLetterExchange -- Route --> DLQ(死信队列)
DLQ_Consumer(DLQ处理器) -- Consume --> DLQ
DLQ_Consumer -- 人工分析/告警/重试 --> Ops
end
style CommandHandler fill:#f9f,stroke:#333,stroke-width:2px
style DLQ fill:#f00,stroke:#333,stroke-width:2px
核心实现:构建弹性的消息处理管道
我们选择RabbitMQ作为消息中间件,因为它对DLQ和复杂的路由策略提供了出色的支持。
1. RabbitMQ队列与DLQ配置
正确配置队列是实现弹性的第一步。我们需要一个主工作队列和一个死信队列。当主队列中的消息因为某些原因(被消费者拒绝、TTL过期等)无法被处理时,它会被自动转发到指定的死信交换机,进而路由到死信队列。
// rabbitmq-setup.js
const amqplib = require('amqplib');
const RABBITMQ_URL = 'amqp://guest:guest@localhost:5672';
// 定义常量,避免魔法字符串
const exchanges = {
COMMANDS: 'commands_exchange',
EVENTS: 'events_exchange',
DEAD_LETTER: 'dead_letter_exchange'
};
const queues = {
ANALYZE_WEBSITE: 'q_analyze_website',
RELATIONSHIP_DISCOVERED: 'q_relationship_discovered',
DEAD_LETTER_QUEUE: 'q_dead_letter'
};
const routingKeys = {
ANALYZE_WEBSITE: 'cmd.analyze.website',
RELATIONSHIP_DISCOVERED: 'evt.relationship.discovered'
};
async function setup() {
const conn = await amqplib.connect(RABBITMQ_URL);
const channel = await conn.createChannel();
// --- 创建交换机 ---
await channel.assertExchange(exchanges.COMMANDS, 'topic', { durable: true });
await channel.assertExchange(exchanges.EVENTS, 'topic', { durable: true });
await channel.assertExchange(exchanges.DEAD_LETTER, 'topic', { durable: true });
console.log('Exchanges asserted.');
// --- 创建死信队列和绑定 ---
await channel.assertQueue(queues.DEAD_LETTER_QUEUE, { durable: true });
await channel.bindQueue(queues.DEAD_LETTER_QUEUE, exchanges.DEAD_LETTER, '#'); // 监听所有死信消息
console.log('Dead Letter Queue setup complete.');
// --- 创建主工作队列,并配置其死信策略 ---
await channel.assertQueue(queues.ANALYZE_WEBSITE, {
durable: true,
arguments: {
'x-dead-letter-exchange': exchanges.DEAD_LETTER,
// 可选:指定死信的routing key,如果不指定则使用原消息的key
'x-dead-letter-routing-key': 'dead.letter'
}
});
await channel.bindQueue(queues.ANALYZE_WEBSITE, exchanges.COMMANDS, routingKeys.ANALYZE_WEBSITE);
console.log('Main command queue setup complete.');
// --- 创建事件队列 ---
await channel.assertQueue(queues.RELATIONSHIP_DISCOVERED, { durable: true });
await channel.bindQueue(queues.RELATIONSHIP_DISCOVERED, exchanges.EVENTS, routingKeys.RELATIONSHIP_DISCOVERED);
console.log('Event queue setup complete.');
await channel.close();
await conn.close();
}
setup().catch(console.error);
这段设置代码是整个系统的基石。关键在于q_analyze_website队列的arguments参数,它明确指定了当消息“死亡”时,应该被发送到dead_letter_exchange。
2. 命令处理器:Puppeteer与失败处理
命令处理器是系统中唯一执行“脏活累活”的地方。它的设计必须把失败作为一等公民来考虑。
// command-handler.js
const puppeteer = require('puppeteer');
const amqplib = require('amqplib');
// ... 引入上面定义的常量 ...
class CommandHandler {
constructor() {
this.channel = null;
this.browser = null;
}
async initialize() {
const conn = await amqplib.connect(RABBITMQ_URL);
this.channel = await conn.createChannel();
this.browser = await puppeteer.launch({
// 生产环境建议使用更健壮的配置
headless: true,
args: ['--no-sandbox', '--disable-setuid-sandbox']
});
console.log('Command Handler initialized.');
}
async listen() {
await this.channel.prefetch(1); // 一次只处理一个消息,防止压垮Puppeteer
console.log('Waiting for commands...');
this.channel.consume(queues.ANALYZE_WEBSITE, async (msg) => {
if (msg === null) return;
const command = JSON.parse(msg.content.toString());
console.log(`Received command to analyze: ${command.url}`);
let page = null;
try {
page = await this.browser.newPage();
await page.setDefaultNavigationTimeout(60000); // 设置60秒超时
await page.goto(command.url, { waitUntil: 'domcontentloaded' });
// 模拟一个可能失败的抓取逻辑
const supplierName = await page.$eval('.supplier-name', el => el.innerText.trim());
if (!supplierName) throw new Error('Supplier name not found');
const dependents = await page.$$eval('.dependent-list li',
lis => lis.map(li => li.innerText.trim()).filter(Boolean)
);
// 成功,发布事件
const event = {
type: 'SupplierRelationshipDiscovered',
payload: {
supplier: supplierName,
dependents: dependents,
sourceUrl: command.url,
timestamp: new Date().toISOString()
}
};
this.channel.publish(
exchanges.EVENTS,
routingKeys.RELATIONSHIP_DISCOVERED,
Buffer.from(JSON.stringify(event))
);
console.log(`Successfully processed ${command.url}, published event.`);
// 显式确认消息,RabbitMQ会将其从队列中删除
this.channel.ack(msg);
} catch (error) {
console.error(`[CRITICAL] Failed to process ${command.url}: ${error.message}`);
// 核心:处理失败。
// nack(msg, false, false) -> 拒绝消息,并且不重新排队。
// 由于队列配置了DLQ,这条消息将被立即路由到死信队列。
this.channel.nack(msg, false, false);
} finally {
if (page) await page.close();
}
});
}
}
const handler = new CommandHandler();
handler.initialize().then(() => handler.listen()).catch(console.error);
这里的关键是try...catch块和channel.nack(msg, false, false)的调用。第一个false表示只拒绝当前这一条消息,第二个false表示不要将它重新放回原队列的队头。正是这个requeue=false的设置,触发了RabbitMQ的死信机制。
3. 事件处理器:幂等地更新Neo4j
事件处理器负责消费领域事件,并更新读模型。它的职责相对单一,但必须保证操作的幂等性。无论同一个SupplierRelationshipDiscovered事件被消费多少次,最终在图数据库中的状态都应该是一致的。Neo4j的MERGE命令天生就适合这个场景。
// event-handler.js
const neo4j = require('neo4j-driver');
const amqplib = require('amqplib');
// ... 引入常量 ...
class EventHandler {
constructor() {
this.channel = null;
this.neo4jDriver = neo4j.driver('neo4j://localhost', neo4j.auth.basic('neo4j', 'password'));
}
async initialize() {
const conn = await amqplib.connect(RABBITMQ_URL);
this.channel = await conn.createChannel();
console.log('Event Handler initialized.');
}
async listen() {
console.log('Waiting for events...');
this.channel.consume(queues.RELATIONSHIP_DISCOVERED, async (msg) => {
if (msg === null) return;
const event = JSON.parse(msg.content.toString());
const { supplier, dependents, sourceUrl } = event.payload;
console.log(`Received event for supplier: ${supplier}`);
const session = this.neo4jDriver.session({ database: 'neo4j' });
try {
// 使用事务保证原子性
const tx = session.beginTransaction();
// MERGE确保了节点的不重复创建
await tx.run(
`MERGE (s:Supplier {name: $name})
ON CREATE SET s.createdAt = timestamp(), s.sourceUrl = $sourceUrl
ON MATCH SET s.updatedAt = timestamp(), s.sourceUrl = $sourceUrl`,
{ name: supplier, sourceUrl: sourceUrl }
);
for (const dependent of dependents) {
await tx.run(
`MERGE (d:Company {name: $name})`,
{ name: dependent }
);
// MERGE也确保了关系的不重复创建
await tx.run(
`MATCH (s:Supplier {name: $supplierName})
MATCH (d:Company {name: $dependentName})
MERGE (s)-[r:SUPPLIES_TO]->(d)
ON CREATE SET r.discoveredAt = timestamp()`,
{ supplierName: supplier, dependentName: dependent }
);
}
await tx.commit();
console.log(`Graph updated for supplier: ${supplier}`);
this.channel.ack(msg);
} catch (error) {
console.error(`Failed to update Neo4j: ${error.message}`);
// 这里的失败处理策略可以更复杂,例如也发送到另一个DLQ
// 为简化,我们暂时只记录错误并拒绝消息(可能导致重试)
this.channel.nack(msg, false, true);
} finally {
await session.close();
}
});
}
async close() {
await this.neo4jDriver.close();
}
}
const handler = new EventHandler();
handler.initialize().then(() => handler.listen()).catch(console.error);
通过MERGE,我们可以安全地重放事件流,而不用担心会产生重复的节点或关系,这是保证最终一致性的关键。
4. 死信队列处理器:分析与决策
DLQ中的消息是宝贵的财富,它们揭示了系统的弱点。一个简单的DLQ处理器至少应该能将这些失败的消息记录下来,供人工分析。
// dlq-consumer.js
const amqplib = require('amqplib');
// ... 引入常量 ...
async function consumeDeadLetters() {
const conn = await amqplib.connect(RABBITMQ_URL);
const channel = await conn.createChannel();
console.log('DLQ Consumer is running...');
channel.consume(queues.DEAD_LETTER_QUEUE, (msg) => {
if (msg !== null) {
const originalContent = msg.content.toString();
const deathInfo = msg.properties.headers['x-death'][0];
console.log('--- Dead Letter Received ---');
console.log(`Reason: ${deathInfo.reason}`);
console.log(`Original Exchange: ${deathInfo.exchange}`);
console.log(`Original Routing Key: ${deathInfo['routing-keys'][0]}`);
console.log(`Timestamp: ${new Date(deathInfo.time.low * 1000)}`);
console.log('Original Message Body:', originalContent);
console.log('----------------------------\n');
// 在这里可以集成告警系统,如 PagerDuty, Slack 等
// 或者将消息持久化到 Elasticsearch/MongoDB 便于搜索和分析
// 确认消息已从DLQ中处理
channel.ack(msg);
}
});
}
consumeDeadLetters().catch(console.error);
在真实的生产环境中,DLQ处理器会更加智能。例如,它可以检查x-death头中的count字段来判断消息已经失败了多少次。对于某些瞬时错误(如网络抖动),可以设计一个简单的延迟重试逻辑,将消息重新发布到主工作队列。对于那些因DOM结构改变而导致的永久性失败,则应该触发告警,通知开发人员更新爬虫逻辑。
局限性与未来展望
这套架构并非银弹。首先,Puppeteer的资源消耗巨大。如果需要抓取的URL规模达到成千上万,单个节点的命令处理器很快会成为瓶颈。水平扩展处理器是可行的,但这会带来浏览器实例管理的复杂性。一个更优的方案是引入类似Browserless.io这样的浏览器即服务(Browser-as-a-Service)平台,将浏览器生命周期管理外包出去。
其次,CQRS引入了最终一致性。从命令发出到Neo4j中数据可查,中间存在延迟。对于我们的供应链分析场景,分钟级的数据延迟是完全可以接受的,但对于需要实时反馈的系统,就需要仔细评估这种延迟带来的影响。
最后,当前的DLQ处理机制还比较初级。一个完善的系统应该建立一套自动化的重试和降级策略。例如,可以根据失败原因(如HTTP 503 vs 404)将死信消息路由到不同的队列,一个用于自动延迟重试,另一个则直接归档并告警。这能极大地减少人工干预的成本,让系统真正具备自愈能力。