构建基于OCI的动态Puppeteer工作负载调度网关并利用CockroachDB实现任务持久化与全文检索


在规模化场景下运行无头浏览器(Headless Browser)任务,尤其是像 Puppeteer 这样的资源密集型应用,从来都不是一件简单的事。内存泄漏、僵尸进程、环境依赖地狱,这些问题在并发量上来之后会变得极其尖锐。最初的方案往往是一个简单的消息队列加上一堆常驻的消费者进程,但这很快就暴露了其脆弱性:不同任务的依赖库可能冲突,单个任务的崩溃可能污染整个工作进程,资源隔离几乎为零。

我们需要一个更健壮的架构。核心思路是将每个 Puppeteer 任务封装成一个隔离的、一次性的执行单元。OCI(Open Container Initiative)容器是实现这一点的标准答案。但随之而来的问题是,如何动态地调度这些容器化任务,如何管理它们的生命周期,如何持久化任务状态和结果,以及如何让最终用户以一种简单的方式与之交互?

这正是我们需要构建一个专用调度网关的原因。这个网关将作为系统的唯一入口,负责接收任务请求、将其持久化到高可用的数据库中,并与后端的容器编排系统(即使是一个简化的)进行交互。在这个架构中,技术选型至关重要。

  • API 网关与代理: 我们需要一个轻量级、高性能的入口来处理 HTTP 请求。Go 语言的 net/http 包足以构建一个生产级的服务,无需引入重型框架。
  • 任务持久化与状态管理: 任务队列需要一个高可用的存储。PostgreSQL 是个不错的选择,但为了应对未来的水平扩展和容错能力,我们选择 CockroachDB。它兼容 PostgreSQL 协议,同时提供原生的分布式事务和弹性伸缩能力,避免了单点故障。
  • 任务执行单元: 将 Puppeteer 脚本与所有依赖打包成 OCI 镜像。这确保了环境的一致性和执行的隔离性。
  • 数据检索: 任务执行后产生的数据需要被查询。与其引入另一个重量级组件如 Elasticsearch,我们可以挖掘 CockroachDB 的潜力,利用其内置的全文检索(Full-Text Search)功能来满足基本的搜索需求。这是一个典型的架构权衡,用单一数据存储的便利性换取可能稍弱的专业搜索能力。

整个系统的核心工作流程可以用下面的时序图来描述:

sequenceDiagram
    participant Client as 客户端
    participant Gateway as 调度网关 (Go)
    participant CockroachDB as 分布式数据库
    participant Scheduler as 任务调度器 (概念)
    participant OCIWorker as Puppeteer 容器

    Client->>+Gateway: POST /tasks (URL, extractor_name)
    Gateway->>+CockroachDB: INSERT INTO tasks (status='PENDING', ...)
    CockroachDB-->>-Gateway: 返回 Task ID
    Gateway-->>-Client: 202 Accepted (Task ID)

    loop 轮询任务
        Scheduler->>+CockroachDB: SELECT * FROM tasks WHERE status='PENDING' LIMIT 1;
        CockroachDB-->>-Scheduler: 返回待处理任务
        Scheduler->>+CockroachDB: UPDATE tasks SET status='RUNNING' WHERE id=...;
        CockroachDB-->>-Scheduler: 更新成功
        Scheduler->>OCIWorker: 启动容器 (e.g., docker run puppet-worker --task-id=...)
    end

    OCIWorker->>+CockroachDB: SELECT * FROM tasks WHERE id=...;
    CockroachDB-->>-OCIWorker: 返回任务详情
    OCIWorker-->>OCIWorker: 执行 Puppeteer 脚本
    OCIWorker->>+CockroachDB: INSERT INTO results (task_id, content);
    CockroachDB-->>-OCIWorker: 存储成功
    OCIWorker->>+CockroachDB: UPDATE tasks SET status='COMPLETED' WHERE id=...;
    CockroachDB-->>-OCIWorker: 更新成功

    Client->>+Gateway: GET /search?q=keyword
    Gateway->>+CockroachDB: SELECT ... FROM results WHERE content @@ to_tsquery('keyword');
    CockroachDB-->>-Gateway: 返回搜索结果
    Gateway-->>-Client: 200 OK (搜索结果)

数据库模型设计与全文检索索引

在 CockroachDB 中,我们需要两张核心表:tasks 用于管理任务元数据,task_results 用于存储提取的数据并支持搜索。

这里的关键在于 task_results 表的设计。我们将在 extracted_content 字段上创建一个倒排索引(Inverted Index),这是实现高效全文检索的基础。CockroachDB 兼容 PostgreSQL 的 FTS 语法,这让实现变得非常直接。

-- tasks 表用于跟踪每个抓取任务的状态
CREATE TABLE IF NOT EXISTS tasks (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    target_url STRING NOT NULL,
    extractor_name STRING NOT NULL, -- 对应要使用的 OCI 镜像/脚本
    status STRING NOT NULL CHECK (status IN ('PENDING', 'RUNNING', 'COMPLETED', 'FAILED')),
    attempts INT DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);

-- task_results 表存储从网页中提取的结构化数据
CREATE TABLE IF NOT EXISTS task_results (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
    extracted_content JSONB NOT NULL, -- 存储提取的 JSON 数据
    -- 创建一个 tsvector 列,用于全文搜索预处理
    content_tsv TSVECTOR,
    created_at TIMESTAMPTZ DEFAULT now()
);

-- 在 content_tsv 上创建 GIN 倒排索引以加速全文搜索
-- 在真实项目中,这个索引的创建和更新需要通过触发器自动完成
CREATE INVERTED INDEX IF NOT EXISTS results_content_fts_idx ON task_results(content_tsv);

-- 创建一个触发器函数,在插入或更新时自动更新 tsvector 列
-- CockroachDB v21.2+ 支持
CREATE OR REPLACE FUNCTION update_tsv() RETURNS TRIGGER AS $$
BEGIN
    -- 我们假设 JSONB 中有一个 'text_content' 字段用于索引
    NEW.content_tsv := to_tsvector('simple', COALESCE(NEW.extracted_content->>'text_content', ''));
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER tsv_update BEFORE INSERT OR UPDATE ON task_results
    FOR EACH ROW EXECUTE PROCEDURE update_tsv();

这个 schema 设计考虑了任务的生命周期管理(status, attempts),并将结果与任务解耦。使用 JSONB 类型可以灵活存储非结构化数据,而 TSVECTOR 和 GIN 索引则是性能的关键。在真实项目中,'simple' 分词器可能需要根据语言换成 'english' 或其他更复杂的分词器。

Go 实现的调度网关

网关是系统的入口,必须稳定可靠。它负责两件事:接收新任务并将其存入 CockroachDB,以及提供一个搜索接口。

// main.go
package main

import (
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	_ "github.com/jackc/pgx/v4/stdlib"
)

type TaskRequest struct {
	URL            string `json:"url"`
	ExtractorName  string `json:"extractor_name"`
}

type TaskResponse struct {
	TaskID string `json:"task_id"`
}

type SearchResult struct {
	TaskID   string          `json:"task_id"`
	Content  json.RawMessage `json:"content"`
	URL      string          `json:"url"`
	ScrapedAt time.Time       `json:"scraped_at"`
}

var db *sql.DB

func main() {
	// 从环境变量获取 CockroachDB 连接字符串
	// e.g., "postgresql://user:password@host:port/database?sslmode=verify-full"
	connStr := os.Getenv("DATABASE_URL")
	if connStr == "" {
		log.Fatal("DATABASE_URL environment variable must be set")
	}

	var err error
	db, err = sql.Open("pgx", connStr)
	if err != nil {
		log.Fatalf("failed to connect to database: %v", err)
	}
	defer db.Close()
	
	db.SetMaxOpenConns(20)
	db.SetMaxIdleConns(10)
	db.SetConnMaxLifetime(time.Hour)

	http.HandleFunc("/tasks", createTaskHandler)
	http.HandleFunc("/search", searchHandler)

	log.Println("Starting gateway server on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("server failed to start: %v", err)
	}
}

func createTaskHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
		return
	}

	var req TaskRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	// 基本的输入验证
	if req.URL == "" || req.ExtractorName == "" {
		http.Error(w, "URL and extractor_name are required", http.StatusBadRequest)
		return
	}

	var taskID string
	// 事务性地插入任务,确保原子性
	tx, err := db.Begin()
	if err != nil {
		http.Error(w, fmt.Sprintf("failed to begin transaction: %v", err), http.StatusInternalServerError)
		return
	}
	defer tx.Rollback() // 如果没有 commit,则回滚

	query := `INSERT INTO tasks (target_url, extractor_name, status) VALUES ($1, $2, 'PENDING') RETURNING id`
	err = tx.QueryRow(query, req.URL, req.ExtractorName).Scan(&taskID)
	if err != nil {
		http.Error(w, fmt.Sprintf("failed to create task: %v", err), http.StatusInternalServerError)
		return
	}
	
	if err := tx.Commit(); err != nil {
		http.Error(w, fmt.Sprintf("failed to commit transaction: %v", err), http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusAccepted)
	json.NewEncoder(w).Encode(TaskResponse{TaskID: taskID})
}

func searchHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodGet {
		http.Error(w, "Only GET method is allowed", http.StatusMethodNotAllowed)
		return
	}

	queryTerm := r.URL.Query().Get("q")
	if queryTerm == "" {
		http.Error(w, "Query parameter 'q' is required", http.StatusBadRequest)
		return
	}
	
	// 使用 to_tsquery 进行全文搜索
	// 注意:在生产环境中,需要对 queryTerm 进行清理以防止 SQL 注入,尽管 tsquery 本身相对安全。
	query := `
		SELECT r.task_id, r.extracted_content, t.target_url, r.created_at
		FROM task_results r
		JOIN tasks t ON r.task_id = t.id
		WHERE r.content_tsv @@ to_tsquery('simple', $1)
		ORDER BY r.created_at DESC
		LIMIT 20;
	`
	rows, err := db.Query(query, queryTerm)
	if err != nil {
		log.Printf("Search query failed: %v", err)
		http.Error(w, "Failed to execute search", http.StatusInternalServerError)
		return
	}
	defer rows.Close()

	var results []SearchResult
	for rows.Next() {
		var res SearchResult
		var contentBytes []byte
		if err := rows.Scan(&res.TaskID, &contentBytes, &res.URL, &res.ScrapedAt); err != nil {
			log.Printf("Failed to scan search result row: %v", err)
			continue
		}
		res.Content = json.RawMessage(contentBytes)
		results = append(results, res)
	}
	
	if err := rows.Err(); err != nil {
        log.Printf("Error iterating search results: %v", err)
		http.Error(w, "Error processing search results", http.StatusInternalServerError)
		return
    }

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(results)
}

这段代码是一个最小但完整的网关。它处理了数据库连接池、事务、JSON 编解码和基本的错误处理。日志记录对于调试生产问题至关重要。

OCI 化的 Puppeteer 工作单元

每个 Puppeteer 任务都应该在一个干净、隔离的环境中运行。Dockerfile 是定义这个环境的关键。

# 使用官方提供的带有 Puppeteer 依赖的 Node.js 镜像
FROM ghcr.io/puppeteer/puppeteer:21.4.0

# 将工作目录切换到 /usr/src/app
WORKDIR /usr/src/app

# 拷贝 package.json 和 package-lock.json
COPY package*.json ./

# 安装依赖
# --frozen-lockfile 确保使用 lockfile 中的精确版本
RUN npm install --frozen-lockfile

# 拷贝应用代码
COPY . .

# 定义容器启动时执行的命令
# worker.js 将作为入口点,并通过环境变量接收任务ID
CMD ["node", "worker.js"]

这个 Dockerfile 使用了 Puppeteer 官方提供的镜像,它预装了所有必需的系统依赖项,避免了手动安装 Chrome 和各种库的麻烦。

worker 脚本是执行实际工作的核心。它必须是无状态的,所有状态信息都从数据库中获取。

// worker.js
const puppeteer = require('puppeteer');
const { Client } = require('pg');

const TASK_ID = process.env.TASK_ID;
const DATABASE_URL = process.env.DATABASE_URL;

if (!TASK_ID || !DATABASE_URL) {
    console.error('Error: TASK_ID and DATABASE_URL environment variables are required.');
    process.exit(1);
}

// 简单的日志函数,带时间戳
const log = (message) => console.log(`[${new Date().toISOString()}] ${message}`);

async function run() {
    const dbClient = new Client({ connectionString: DATABASE_URL });
    let browser = null;

    try {
        log(`Worker starting for task: ${TASK_ID}`);
        await dbClient.connect();

        // 1. 获取任务详情
        const taskRes = await dbClient.query('SELECT target_url, extractor_name FROM tasks WHERE id = $1', [TASK_ID]);
        if (taskRes.rows.length === 0) {
            throw new Error(`Task with ID ${TASK_ID} not found.`);
        }
        const { target_url, extractor_name } = taskRes.rows[0];
        log(`Fetching URL: ${target_url} with extractor: ${extractor_name}`);
        
        // 2. 启动 Puppeteer
        // 在容器中运行时,必须使用 --no-sandbox 参数
        // --disable-dev-shm-usage 也是一个常见的优化,防止 /dev/shm 空间不足
        browser = await puppeteer.launch({
            headless: 'new',
            args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
        });
        const page = await browser.newPage();
        
        // 设置合理的超时
        await page.setDefaultNavigationTimeout(60000); // 60秒导航超时

        // 3. 执行抓取
        await page.goto(target_url, { waitUntil: 'networkidle2' });

        // 4. 动态加载并执行提取器逻辑
        // 在真实项目中,这里可以设计得更复杂,比如从 S3 或 Git 加载脚本
        let extractedData;
        if (extractor_name === 'basic_title_and_text') {
            extractedData = await page.evaluate(() => {
                return {
                    title: document.title,
                    text_content: document.body.innerText.substring(0, 5000) // 限制长度
                };
            });
        } else {
            throw new Error(`Unknown extractor: ${extractor_name}`);
        }
        
        log(`Extraction successful. Title: ${extractedData.title}`);

        // 5. 存储结果到数据库
        const resultQuery = 'INSERT INTO task_results (task_id, extracted_content) VALUES ($1, $2)';
        await dbClient.query(resultQuery, [TASK_ID, extractedData]);
        log('Result saved to database.');

        // 6. 更新任务状态为 COMPLETED
        const updateQuery = "UPDATE tasks SET status = 'COMPLETED', updated_at = now() WHERE id = $1";
        await dbClient.query(updateQuery, [TASK_ID]);
        log(`Task ${TASK_ID} marked as COMPLETED.`);

    } catch (error) {
        log(`Error processing task ${TASK_ID}: ${error.message}`);
        // 发生错误时,将任务状态标记为 FAILED
        try {
            const attemptsRes = await dbClient.query("SELECT attempts FROM tasks WHERE id = $1", [TASK_ID]);
            const newAttempts = (attemptsRes.rows[0]?.attempts || 0) + 1;
            const failQuery = "UPDATE tasks SET status = 'FAILED', attempts = $1, updated_at = now() WHERE id = $2";
            await dbClient.query(failQuery, [newAttempts, TASK_ID]);
        } catch (dbError) {
            log(`Failed to mark task as FAILED: ${dbError.message}`);
        }
        process.exit(1); // 以非零状态码退出,表示失败
    } finally {
        if (browser) {
            await browser.close();
        }
        await dbClient.end();
        log('Worker finished.');
    }
}

run();

这个 worker 脚本是生产级的。它包含了详细的错误处理逻辑,当抓取或存储失败时,会更新任务状态为 FAILED 并增加重试次数。finally 块确保了无论成功还是失败,浏览器实例和数据库连接都会被正确关闭,这是防止资源泄漏的关键。

架构的局限性与未来展望

这个架构解决了很多规模化运行 Puppeteer 的核心问题:通过 OCI 容器实现了环境隔离和依赖管理,通过 CockroachDB 实现了任务持久化和系统韧性,通过 API 网关提供了清晰的服务边界。然而,它并非没有局限性。

首先,任务调度器(Scheduler)在本文中是一个概念性的存在。一个简单的、基于轮询的调度器在小规模时可以工作,但无法高效地扩展。在生产环境中,这部分应该由 Kubernetes 来承担。可以创建一个自定义的 Kubernetes Operator 来监听 tasks 表的变化(例如通过 Debezium CDC 或轮询),并为每个 PENDING 的任务动态创建一个 Kubernetes Job。这能更好地利用集群资源,并提供更复杂的重试和生命周期管理策略。

其次,使用 CockroachDB 的全文检索是一个务实的权衡。它简化了技术栈,但对于复杂的搜索需求,如多语言分词、同义词、权重调整、聚合和分面搜索,功能远不如专门的搜索引擎(如 Elasticsearch 或 OpenSearch)。当搜索成为系统的核心功能而非辅助功能时,引入一个专用的搜索集群,并通过数据管道(如 Kafka Connect)将 task_results 同步过去,将是必然的演进方向。

最后,系统的可观测性仍有待加强。虽然我们加入了一些日志,但一个完整的可观测性体系需要引入结构化日志、分布式追踪(例如通过 OpenTelemetry),以及关键指标的监控(例如任务处理延迟、成功/失败率、数据库连接池使用情况)。这些是确保系统在生产环境中稳定运行并能快速定位问题的基石。


  目录