在规模化场景下运行无头浏览器(Headless Browser)任务,尤其是像 Puppeteer 这样的资源密集型应用,从来都不是一件简单的事。内存泄漏、僵尸进程、环境依赖地狱,这些问题在并发量上来之后会变得极其尖锐。最初的方案往往是一个简单的消息队列加上一堆常驻的消费者进程,但这很快就暴露了其脆弱性:不同任务的依赖库可能冲突,单个任务的崩溃可能污染整个工作进程,资源隔离几乎为零。
我们需要一个更健壮的架构。核心思路是将每个 Puppeteer 任务封装成一个隔离的、一次性的执行单元。OCI(Open Container Initiative)容器是实现这一点的标准答案。但随之而来的问题是,如何动态地调度这些容器化任务,如何管理它们的生命周期,如何持久化任务状态和结果,以及如何让最终用户以一种简单的方式与之交互?
这正是我们需要构建一个专用调度网关的原因。这个网关将作为系统的唯一入口,负责接收任务请求、将其持久化到高可用的数据库中,并与后端的容器编排系统(即使是一个简化的)进行交互。在这个架构中,技术选型至关重要。
- API 网关与代理: 我们需要一个轻量级、高性能的入口来处理 HTTP 请求。Go 语言的
net/http
包足以构建一个生产级的服务,无需引入重型框架。 - 任务持久化与状态管理: 任务队列需要一个高可用的存储。PostgreSQL 是个不错的选择,但为了应对未来的水平扩展和容错能力,我们选择 CockroachDB。它兼容 PostgreSQL 协议,同时提供原生的分布式事务和弹性伸缩能力,避免了单点故障。
- 任务执行单元: 将 Puppeteer 脚本与所有依赖打包成 OCI 镜像。这确保了环境的一致性和执行的隔离性。
- 数据检索: 任务执行后产生的数据需要被查询。与其引入另一个重量级组件如 Elasticsearch,我们可以挖掘 CockroachDB 的潜力,利用其内置的全文检索(Full-Text Search)功能来满足基本的搜索需求。这是一个典型的架构权衡,用单一数据存储的便利性换取可能稍弱的专业搜索能力。
整个系统的核心工作流程可以用下面的时序图来描述:
sequenceDiagram participant Client as 客户端 participant Gateway as 调度网关 (Go) participant CockroachDB as 分布式数据库 participant Scheduler as 任务调度器 (概念) participant OCIWorker as Puppeteer 容器 Client->>+Gateway: POST /tasks (URL, extractor_name) Gateway->>+CockroachDB: INSERT INTO tasks (status='PENDING', ...) CockroachDB-->>-Gateway: 返回 Task ID Gateway-->>-Client: 202 Accepted (Task ID) loop 轮询任务 Scheduler->>+CockroachDB: SELECT * FROM tasks WHERE status='PENDING' LIMIT 1; CockroachDB-->>-Scheduler: 返回待处理任务 Scheduler->>+CockroachDB: UPDATE tasks SET status='RUNNING' WHERE id=...; CockroachDB-->>-Scheduler: 更新成功 Scheduler->>OCIWorker: 启动容器 (e.g., docker run puppet-worker --task-id=...) end OCIWorker->>+CockroachDB: SELECT * FROM tasks WHERE id=...; CockroachDB-->>-OCIWorker: 返回任务详情 OCIWorker-->>OCIWorker: 执行 Puppeteer 脚本 OCIWorker->>+CockroachDB: INSERT INTO results (task_id, content); CockroachDB-->>-OCIWorker: 存储成功 OCIWorker->>+CockroachDB: UPDATE tasks SET status='COMPLETED' WHERE id=...; CockroachDB-->>-OCIWorker: 更新成功 Client->>+Gateway: GET /search?q=keyword Gateway->>+CockroachDB: SELECT ... FROM results WHERE content @@ to_tsquery('keyword'); CockroachDB-->>-Gateway: 返回搜索结果 Gateway-->>-Client: 200 OK (搜索结果)
数据库模型设计与全文检索索引
在 CockroachDB 中,我们需要两张核心表:tasks
用于管理任务元数据,task_results
用于存储提取的数据并支持搜索。
这里的关键在于 task_results
表的设计。我们将在 extracted_content
字段上创建一个倒排索引(Inverted Index),这是实现高效全文检索的基础。CockroachDB 兼容 PostgreSQL 的 FTS 语法,这让实现变得非常直接。
-- tasks 表用于跟踪每个抓取任务的状态
CREATE TABLE IF NOT EXISTS tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
target_url STRING NOT NULL,
extractor_name STRING NOT NULL, -- 对应要使用的 OCI 镜像/脚本
status STRING NOT NULL CHECK (status IN ('PENDING', 'RUNNING', 'COMPLETED', 'FAILED')),
attempts INT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
-- task_results 表存储从网页中提取的结构化数据
CREATE TABLE IF NOT EXISTS task_results (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
extracted_content JSONB NOT NULL, -- 存储提取的 JSON 数据
-- 创建一个 tsvector 列,用于全文搜索预处理
content_tsv TSVECTOR,
created_at TIMESTAMPTZ DEFAULT now()
);
-- 在 content_tsv 上创建 GIN 倒排索引以加速全文搜索
-- 在真实项目中,这个索引的创建和更新需要通过触发器自动完成
CREATE INVERTED INDEX IF NOT EXISTS results_content_fts_idx ON task_results(content_tsv);
-- 创建一个触发器函数,在插入或更新时自动更新 tsvector 列
-- CockroachDB v21.2+ 支持
CREATE OR REPLACE FUNCTION update_tsv() RETURNS TRIGGER AS $$
BEGIN
-- 我们假设 JSONB 中有一个 'text_content' 字段用于索引
NEW.content_tsv := to_tsvector('simple', COALESCE(NEW.extracted_content->>'text_content', ''));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER tsv_update BEFORE INSERT OR UPDATE ON task_results
FOR EACH ROW EXECUTE PROCEDURE update_tsv();
这个 schema 设计考虑了任务的生命周期管理(status
, attempts
),并将结果与任务解耦。使用 JSONB
类型可以灵活存储非结构化数据,而 TSVECTOR
和 GIN 索引则是性能的关键。在真实项目中,'simple'
分词器可能需要根据语言换成 'english'
或其他更复杂的分词器。
Go 实现的调度网关
网关是系统的入口,必须稳定可靠。它负责两件事:接收新任务并将其存入 CockroachDB,以及提供一个搜索接口。
// main.go
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
_ "github.com/jackc/pgx/v4/stdlib"
)
type TaskRequest struct {
URL string `json:"url"`
ExtractorName string `json:"extractor_name"`
}
type TaskResponse struct {
TaskID string `json:"task_id"`
}
type SearchResult struct {
TaskID string `json:"task_id"`
Content json.RawMessage `json:"content"`
URL string `json:"url"`
ScrapedAt time.Time `json:"scraped_at"`
}
var db *sql.DB
func main() {
// 从环境变量获取 CockroachDB 连接字符串
// e.g., "postgresql://user:password@host:port/database?sslmode=verify-full"
connStr := os.Getenv("DATABASE_URL")
if connStr == "" {
log.Fatal("DATABASE_URL environment variable must be set")
}
var err error
db, err = sql.Open("pgx", connStr)
if err != nil {
log.Fatalf("failed to connect to database: %v", err)
}
defer db.Close()
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(time.Hour)
http.HandleFunc("/tasks", createTaskHandler)
http.HandleFunc("/search", searchHandler)
log.Println("Starting gateway server on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("server failed to start: %v", err)
}
}
func createTaskHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
var req TaskRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 基本的输入验证
if req.URL == "" || req.ExtractorName == "" {
http.Error(w, "URL and extractor_name are required", http.StatusBadRequest)
return
}
var taskID string
// 事务性地插入任务,确保原子性
tx, err := db.Begin()
if err != nil {
http.Error(w, fmt.Sprintf("failed to begin transaction: %v", err), http.StatusInternalServerError)
return
}
defer tx.Rollback() // 如果没有 commit,则回滚
query := `INSERT INTO tasks (target_url, extractor_name, status) VALUES ($1, $2, 'PENDING') RETURNING id`
err = tx.QueryRow(query, req.URL, req.ExtractorName).Scan(&taskID)
if err != nil {
http.Error(w, fmt.Sprintf("failed to create task: %v", err), http.StatusInternalServerError)
return
}
if err := tx.Commit(); err != nil {
http.Error(w, fmt.Sprintf("failed to commit transaction: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(TaskResponse{TaskID: taskID})
}
func searchHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Only GET method is allowed", http.StatusMethodNotAllowed)
return
}
queryTerm := r.URL.Query().Get("q")
if queryTerm == "" {
http.Error(w, "Query parameter 'q' is required", http.StatusBadRequest)
return
}
// 使用 to_tsquery 进行全文搜索
// 注意:在生产环境中,需要对 queryTerm 进行清理以防止 SQL 注入,尽管 tsquery 本身相对安全。
query := `
SELECT r.task_id, r.extracted_content, t.target_url, r.created_at
FROM task_results r
JOIN tasks t ON r.task_id = t.id
WHERE r.content_tsv @@ to_tsquery('simple', $1)
ORDER BY r.created_at DESC
LIMIT 20;
`
rows, err := db.Query(query, queryTerm)
if err != nil {
log.Printf("Search query failed: %v", err)
http.Error(w, "Failed to execute search", http.StatusInternalServerError)
return
}
defer rows.Close()
var results []SearchResult
for rows.Next() {
var res SearchResult
var contentBytes []byte
if err := rows.Scan(&res.TaskID, &contentBytes, &res.URL, &res.ScrapedAt); err != nil {
log.Printf("Failed to scan search result row: %v", err)
continue
}
res.Content = json.RawMessage(contentBytes)
results = append(results, res)
}
if err := rows.Err(); err != nil {
log.Printf("Error iterating search results: %v", err)
http.Error(w, "Error processing search results", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(results)
}
这段代码是一个最小但完整的网关。它处理了数据库连接池、事务、JSON 编解码和基本的错误处理。日志记录对于调试生产问题至关重要。
OCI 化的 Puppeteer 工作单元
每个 Puppeteer 任务都应该在一个干净、隔离的环境中运行。Dockerfile 是定义这个环境的关键。
# 使用官方提供的带有 Puppeteer 依赖的 Node.js 镜像
FROM ghcr.io/puppeteer/puppeteer:21.4.0
# 将工作目录切换到 /usr/src/app
WORKDIR /usr/src/app
# 拷贝 package.json 和 package-lock.json
COPY package*.json ./
# 安装依赖
# --frozen-lockfile 确保使用 lockfile 中的精确版本
RUN npm install --frozen-lockfile
# 拷贝应用代码
COPY . .
# 定义容器启动时执行的命令
# worker.js 将作为入口点,并通过环境变量接收任务ID
CMD ["node", "worker.js"]
这个 Dockerfile 使用了 Puppeteer 官方提供的镜像,它预装了所有必需的系统依赖项,避免了手动安装 Chrome 和各种库的麻烦。
worker 脚本是执行实际工作的核心。它必须是无状态的,所有状态信息都从数据库中获取。
// worker.js
const puppeteer = require('puppeteer');
const { Client } = require('pg');
const TASK_ID = process.env.TASK_ID;
const DATABASE_URL = process.env.DATABASE_URL;
if (!TASK_ID || !DATABASE_URL) {
console.error('Error: TASK_ID and DATABASE_URL environment variables are required.');
process.exit(1);
}
// 简单的日志函数,带时间戳
const log = (message) => console.log(`[${new Date().toISOString()}] ${message}`);
async function run() {
const dbClient = new Client({ connectionString: DATABASE_URL });
let browser = null;
try {
log(`Worker starting for task: ${TASK_ID}`);
await dbClient.connect();
// 1. 获取任务详情
const taskRes = await dbClient.query('SELECT target_url, extractor_name FROM tasks WHERE id = $1', [TASK_ID]);
if (taskRes.rows.length === 0) {
throw new Error(`Task with ID ${TASK_ID} not found.`);
}
const { target_url, extractor_name } = taskRes.rows[0];
log(`Fetching URL: ${target_url} with extractor: ${extractor_name}`);
// 2. 启动 Puppeteer
// 在容器中运行时,必须使用 --no-sandbox 参数
// --disable-dev-shm-usage 也是一个常见的优化,防止 /dev/shm 空间不足
browser = await puppeteer.launch({
headless: 'new',
args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
});
const page = await browser.newPage();
// 设置合理的超时
await page.setDefaultNavigationTimeout(60000); // 60秒导航超时
// 3. 执行抓取
await page.goto(target_url, { waitUntil: 'networkidle2' });
// 4. 动态加载并执行提取器逻辑
// 在真实项目中,这里可以设计得更复杂,比如从 S3 或 Git 加载脚本
let extractedData;
if (extractor_name === 'basic_title_and_text') {
extractedData = await page.evaluate(() => {
return {
title: document.title,
text_content: document.body.innerText.substring(0, 5000) // 限制长度
};
});
} else {
throw new Error(`Unknown extractor: ${extractor_name}`);
}
log(`Extraction successful. Title: ${extractedData.title}`);
// 5. 存储结果到数据库
const resultQuery = 'INSERT INTO task_results (task_id, extracted_content) VALUES ($1, $2)';
await dbClient.query(resultQuery, [TASK_ID, extractedData]);
log('Result saved to database.');
// 6. 更新任务状态为 COMPLETED
const updateQuery = "UPDATE tasks SET status = 'COMPLETED', updated_at = now() WHERE id = $1";
await dbClient.query(updateQuery, [TASK_ID]);
log(`Task ${TASK_ID} marked as COMPLETED.`);
} catch (error) {
log(`Error processing task ${TASK_ID}: ${error.message}`);
// 发生错误时,将任务状态标记为 FAILED
try {
const attemptsRes = await dbClient.query("SELECT attempts FROM tasks WHERE id = $1", [TASK_ID]);
const newAttempts = (attemptsRes.rows[0]?.attempts || 0) + 1;
const failQuery = "UPDATE tasks SET status = 'FAILED', attempts = $1, updated_at = now() WHERE id = $2";
await dbClient.query(failQuery, [newAttempts, TASK_ID]);
} catch (dbError) {
log(`Failed to mark task as FAILED: ${dbError.message}`);
}
process.exit(1); // 以非零状态码退出,表示失败
} finally {
if (browser) {
await browser.close();
}
await dbClient.end();
log('Worker finished.');
}
}
run();
这个 worker 脚本是生产级的。它包含了详细的错误处理逻辑,当抓取或存储失败时,会更新任务状态为 FAILED
并增加重试次数。finally
块确保了无论成功还是失败,浏览器实例和数据库连接都会被正确关闭,这是防止资源泄漏的关键。
架构的局限性与未来展望
这个架构解决了很多规模化运行 Puppeteer 的核心问题:通过 OCI 容器实现了环境隔离和依赖管理,通过 CockroachDB 实现了任务持久化和系统韧性,通过 API 网关提供了清晰的服务边界。然而,它并非没有局限性。
首先,任务调度器(Scheduler)在本文中是一个概念性的存在。一个简单的、基于轮询的调度器在小规模时可以工作,但无法高效地扩展。在生产环境中,这部分应该由 Kubernetes 来承担。可以创建一个自定义的 Kubernetes Operator 来监听 tasks
表的变化(例如通过 Debezium CDC 或轮询),并为每个 PENDING
的任务动态创建一个 Kubernetes Job。这能更好地利用集群资源,并提供更复杂的重试和生命周期管理策略。
其次,使用 CockroachDB 的全文检索是一个务实的权衡。它简化了技术栈,但对于复杂的搜索需求,如多语言分词、同义词、权重调整、聚合和分面搜索,功能远不如专门的搜索引擎(如 Elasticsearch 或 OpenSearch)。当搜索成为系统的核心功能而非辅助功能时,引入一个专用的搜索集群,并通过数据管道(如 Kafka Connect)将 task_results
同步过去,将是必然的演进方向。
最后,系统的可观测性仍有待加强。虽然我们加入了一些日志,但一个完整的可观测性体系需要引入结构化日志、分布式追踪(例如通过 OpenTelemetry),以及关键指标的监控(例如任务处理延迟、成功/失败率、数据库连接池使用情况)。这些是确保系统在生产环境中稳定运行并能快速定位问题的基石。