基于 GitHub Actions 构建从 SQL 到 NoSQL 的双写数据一致性校验管道


我们的用户中心服务,其核心数据模型一直存在于一个 PostgreSQL 数据库的 usersuser_profiles 表中。随着业务迭代,profile 表的字段变得越来越臃肿,频繁的 ALTER TABLE 操作不仅在生产环境风险极高,也拖慢了开发节奏。更糟糕的是,为了支持一些动态标签和配置,我们妥协性地引入了 jsonb 字段,这本质上是将非结构化数据硬塞进了关系模型,查询和索引的复杂度直线上升。

技术债积累到一定程度,重构势在必行。团队决定将用户配置、标签这类高度动态化的数据迁移到文档型数据库 MongoDB 中,利用其灵活的 Schema 来解决问题。但对于一个 24/7 运行的核心服务,停机迁移是不可接受的方案。唯一的路径是采用“绞杀者模式”(Strangler Fig Pattern),在一段时间内,应用层进行双写:一份写入旧的 PostgreSQL,一份写入新的 MongoDB。

双写引入了新的、更严峻的挑战:如何确保两个异构数据源之间的数据一致性?任何代码逻辑的缺陷、网络抖动或数据库特性的差异都可能导致数据漂移。手动比对数据无异于大海捞针。我们需要一个自动化的、可靠的、集成在开发流程中的校验机制。这个机制必须在代码变更合并到主干之前,就能精准地发现潜在的一致性问题。

最终,我们选择使用 GitHub Actions 来构建这套自动化校验管道。它与我们的代码仓库深度集成,能够在每次 Pull Request 中触发,利用容器技术动态创建隔离的数据库环境,执行深度数据比对,并将结果直接反馈到 PR 状态检查中。这不仅仅是一个 CI 脚本,它是我们迁移过程中保障数据质量的核心防线。

架构设计与校验流程

在深入代码之前,我们先明确整个校验流水线的架构和数据流。当开发者提交一个涉及用户数据读写逻辑的 Pull Request 时,GitHub Actions 会被触发,执行以下流程:

graph TD
    subgraph GitHub Actions Runner
        A[PR Triggered] --> B{Setup Environment};
        B --> C[Start PostgreSQL Container];
        B --> D[Start MongoDB Container];
        C --> E{Seed Databases};
        D --> E;
        E --> F[Run Consistency Verification Script];
        F --> G{Report Results};
    end

    subgraph Verification Logic
        F --> F1[Connect to PG & Mongo];
        F1 --> F2[Fetch Batch from PostgreSQL];
        F2 --> F3[Fetch Corresponding Docs from MongoDB];
        F3 --> F4[Perform Deep Comparison];
        F4 --> F5{Mismatch?};
        F5 -- Yes --> F6[Log Detailed Diff & Exit 1];
        F5 -- No --> F7[Continue to Next Batch];
        F7 --> F2;
        F7 -- All Batches Done --> F8[Log Success & Exit 0];
    end

    G -- Failure --> H[Fail PR Check];
    G -- Success --> I[Pass PR Check];

    A -- "on: pull_request" --> F

这个流程的核心在于,它创建了一个与生产环境隔离但逻辑上一致的微型“沙盒”。在这个沙盒里,我们可以安全地模拟双写后的状态,并运行高强度的校验程序,而无需担心对任何现有环境造成影响。

GitHub Actions 工作流配置

一切始于 .github/workflows/data-consistency-check.yml 文件。这份 YAML 文件定义了整个 CI 作业的生命周期。在真实项目中,配置会更复杂,但这里的骨架是生产级的。

# .github/workflows/data-consistency-check.yml

name: Dual-Write Data Consistency Check

on:
  pull_request:
    paths:
      # 仅当影响核心数据模型或双写逻辑的代码变更时触发
      - 'src/models/**'
      - 'src/services/user_profile_writer.js'
      - 'scripts/verify_consistency.py'
      - '.github/workflows/data-consistency-check.yml'
    branches:
      - main

jobs:
  verify-data-consistency:
    runs-on: ubuntu-latest
    
    # 使用服务容器来启动依赖的数据库
    # 这是 GitHub Actions 的强大功能,可以为作业提供临时的、网络隔离的服务
    services:
      postgres:
        image: postgres:14.5
        # 使用健康检查确保数据库完全启动后再开始作业步骤
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        env:
          POSTGRES_USER: user
          POSTGRES_PASSWORD: ${{ secrets.TEST_DB_PASSWORD }}
          POSTGRES_DB: test_db
        ports:
          - 5432:5432

      mongodb:
        image: mongo:6.0
        env:
          MONGO_INITDB_ROOT_USERNAME: root
          MONGO_INITDB_ROOT_PASSWORD: ${{ secrets.TEST_DB_PASSWORD }}
        ports:
          - 27017:27017
          
    steps:
      - name: Checkout repository
        uses: actions/checkout@v3

      - name: Set up Python 3.10
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
          cache: 'pip'

      - name: Install Python dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r scripts/requirements.txt

      - name: Wait for databases to be ready
        # 即使有健康检查,有时也需要短暂等待网络和应用层就绪
        run: sleep 15

      - name: Run Data Seeding and Consistency Verification
        # 将数据库凭证通过环境变量安全地传递给脚本
        # 这里使用 localhost 是因为服务容器和作业运行在同一个 Docker 网络中
        env:
          PG_HOST: localhost
          PG_PORT: 5432
          PG_USER: user
          PG_PASSWORD: ${{ secrets.TEST_DB_PASSWORD }}
          PG_DATABASE: test_db
          MONGO_HOST: localhost
          MONGO_PORT: 27017
          MONGO_USER: root
          MONGO_PASSWORD: ${{ secrets.TEST_DB_PASSWORD }}
        run: python scripts/verify_consistency.py

这里的关键点:

  1. 触发路径 (paths): 我们不希望每次提交都运行这个昂贵的作业。它只应该在可能影响数据一致性的核心代码被修改时运行。
  2. 服务容器 (services): 这是实现环境隔离的核心。我们启动了指定版本的 PostgreSQL 和 MongoDB 容器。GitHub Actions 会处理网络配置,让作业步骤可以通过 localhost 直接访问这些服务。
  3. 健康检查 (options): 对于数据库这类启动需要时间的服务,健康检查至关重要。它能确保后续步骤运行时,数据库已经准备好接受连接。
  4. 密钥管理 (secrets): 密码等敏感信息绝不能硬编码。我们使用 GitHub 的加密密钥 secrets.TEST_DB_PASSWORD} 来安全地存储和使用它们。

数据模型与校验脚本

工作流已经就绪,现在来看它执行的核心——verify_consistency.py 脚本。这个脚本负责三件事:连接数据库、植入测试数据、以及执行核心的比对逻辑。

1. 数据库 Schema 定义

首先,我们需要一个有代表性的 PostgreSQL 表结构。

-- a simplified user table for demonstration
CREATE TABLE users (
    id UUID PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- the profile table we are migrating
CREATE TABLE user_profiles (
    user_id UUID PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
    full_name VARCHAR(255),
    bio TEXT,
    last_login TIMESTAMPTZ,
    -- This is the problematic column we want to get rid of
    settings JSONB
);

对应的 MongoDB 文档结构可能如下。注意,settings 被扁平化了,并且字段名也可能为了更清晰而做了调整(例如 _id 对应 user_id)。

{
  "_id": "uuid-goes-here", // Corresponds to user_id
  "email": "[email protected]",
  "fullName": "Test User",
  "biography": "A sample bio.",
  "lastLogin": ISODate("..."),
  "preferences": {
    "theme": "dark",
    "notifications": {
      "push": true,
      "email": false
    }
  },
  "createdAt": ISODate("..."),
  "updatedAt": ISODate("...")
}

2. 核心校验脚本

这是整个方案的基石。一个常见的错误是直接进行字典比较,这在异构数据库之间是行不通的。数据类型(如 TIMESTAMPTZ vs ISODate)、浮点数精度、甚至是 None/null 的表示都可能存在差异。一个健壮的比较器必须能够处理这些细微差别。

# scripts/verify_consistency.py
import os
import sys
import uuid
import json
from datetime import datetime, timezone
from decimal import Decimal

import psycopg2
from psycopg2.extras import RealDictCursor
from pymongo import MongoClient

# --- Configuration from Environment Variables ---
PG_CONFIG = {
    "host": os.environ.get("PG_HOST"),
    "port": os.environ.get("PG_PORT"),
    "user": os.environ.get("PG_USER"),
    "password": os.environ.get("PG_PASSWORD"),
    "dbname": os.environ.get("PG_DATABASE"),
}
MONGO_CONFIG = {
    "host": os.environ.get("MONGO_HOST"),
    "port": int(os.environ.get("MONGO_PORT")),
    "username": os.environ.get("MONGO_USER"),
    "password": os.environ.get("MONGO_PASSWORD"),
}
MONGO_DB_NAME = "user_profiles_db"
MONGO_COLLECTION_NAME = "profiles"


def setup_databases(pg_conn, mongo_client):
    """
    Creates schemas and seeds test data. In a real project,
    this would be more complex, possibly involving a dedicated seeding script.
    """
    print("--- Setting up database schemas and seeding data ---")
    
    # ... (SQL CREATE TABLE statements executed here) ...
    # ... (Seeding logic to insert sample data into both PG and Mongo) ...
    # This part is crucial to ensure the test starts with a known state.
    # For brevity, the full seeding logic is omitted, but it would create
    # a set of users with varying profiles to test different data types and structures.
    
    print("--- Database setup complete ---")


def normalize_value(value):
    """
    Recursively normalize values for comparison.
    - Converts datetimes to timezone-aware UTC strings.
    - Converts Decimals to floats.
    - Handles nested dicts and lists.
    """
    if isinstance(value, datetime):
        return value.astimezone(timezone.utc).isoformat()
    if isinstance(value, Decimal):
        return float(value)
    if isinstance(value, uuid.UUID):
        return str(value)
    if isinstance(value, dict):
        return {k: normalize_value(v) for k, v in value.items()}
    if isinstance(value, list):
        return [normalize_value(v) for v in value]
    return value


def deep_compare(obj1, obj2, path=""):
    """
    Performs a deep, type-aware comparison of two dictionary-like objects.
    Returns a list of discrepancies.
    """
    discrepancies = []
    keys1 = set(obj1.keys())
    keys2 = set(obj2.keys())

    common_keys = keys1.intersection(keys2)
    for key in common_keys:
        val1 = normalize_value(obj1[key])
        val2 = normalize_value(obj2[key])
        current_path = f"{path}.{key}" if path else key

        if isinstance(val1, dict) and isinstance(val2, dict):
            discrepancies.extend(deep_compare(val1, val2, path=current_path))
        elif val1 != val2:
            discrepancy = {
                "path": current_path,
                "source_value": val1,
                "target_value": val2
            }
            discrepancies.append(discrepancy)

    added_keys = keys2 - keys1
    if added_keys:
        discrepancies.append({"path": path, "keys_added_in_target": list(added_keys)})
        
    removed_keys = keys1 - keys2
    if removed_keys:
        discrepancies.append({"path": path, "keys_removed_in_target": list(removed_keys)})

    return discrepancies


def run_verification(pg_conn, mongo_collection):
    """
    The main verification logic.
    """
    print("\n--- Starting data consistency verification ---")
    mismatched_records = []
    
    with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
        # In a real scenario, you'd fetch in batches.
        cursor.execute("""
            SELECT
                u.id,
                u.email,
                u.created_at,
                up.full_name,
                up.bio,
                up.last_login,
                up.settings
            FROM users u
            JOIN user_profiles up ON u.id = up.user_id
            ORDER BY u.created_at;
        """)
        
        pg_records = cursor.fetchall()
        print(f"Fetched {len(pg_records)} records from PostgreSQL.")

    for pg_record in pg_records:
        user_id = str(pg_record['id'])
        mongo_doc = mongo_collection.find_one({"_id": user_id})

        if not mongo_doc:
            mismatched_records.append({"user_id": user_id, "error": "Document not found in MongoDB"})
            continue

        # --- Data Transformation: Mapping SQL record to expected Mongo document structure ---
        # This is a critical step. The raw SQL result often needs reshaping to match
        # the target document structure.
        expected_doc = {
            "_id": user_id,
            "email": pg_record['email'],
            "fullName": pg_record['full_name'],
            "biography": pg_record['bio'],
            "lastLogin": pg_record['last_login'],
            "preferences": pg_record['settings'], # Assuming settings jsonb maps directly
            "createdAt": pg_record['created_at'],
            # updatedAt is missing in this query, a potential source of discrepancy
        }

        # The 'updatedAt' field is not selected from PG, we must remove it from the Mongo doc
        # before comparison, or handle it explicitly. This is a common pitfall.
        if 'updatedAt' in mongo_doc:
            del mongo_doc['updatedAt']

        discrepancies = deep_compare(expected_doc, mongo_doc)
        if discrepancies:
            mismatched_records.append({
                "user_id": user_id,
                "discrepancies": discrepancies
            })
    
    return mismatched_records


def main():
    try:
        pg_conn = psycopg2.connect(**PG_CONFIG)
        mongo_client = MongoClient(**MONGO_CONFIG)
        mongo_db = mongo_client[MONGO_DB_NAME]
        mongo_collection = mongo_db[MONGO_COLLECTION_NAME]
        print("Successfully connected to databases.")
    except Exception as e:
        print(f"Error connecting to databases: {e}", file=sys.stderr)
        sys.exit(1)

    # setup_databases(pg_conn, mongo_client) # This would be called to seed data

    mismatches = run_verification(pg_conn, mongo_collection)

    pg_conn.close()
    mongo_client.close()

    if mismatches:
        print("\n--- DATA CONSISTENCY CHECK FAILED ---", file=sys.stderr)
        print(f"Found {len(mismatches)} mismatched records:", file=sys.stderr)
        # Pretty print the JSON for better readability in logs
        print(json.dumps(mismatches, indent=2, default=str), file=sys.stderr)
        sys.exit(1)
    else:
        print("\n--- DATA CONSISTENCY CHECK PASSED ---")
        sys.exit(0)

if __name__ == "__main__":
    main()

这个脚本的精髓在于 deep_comparenormalize_value 函数。它没有天真地假设数据类型会完全一致,而是先将所有待比较的值“规范化”到一个共同的表示形式(例如,所有时间都转为带时区的 ISO 8601 字符串),然后再进行比较。它还能递归地深入嵌套的字典,并明确报告出是值不匹配、目标端多了字段,还是少了字段。这种详尽的报告对于在 CI 日志中快速定位问题至关重要。

当 PR 中引入了一个 bug,比如错误地将 settings.notifications.email 写入了 true,CI 的输出会非常清晰:

--- DATA CONSISTENCY CHECK FAILED ---
Found 1 mismatched records:
[
  {
    "user_id": "a1b2c3d4-e5f6-a7b8-c9d0-e1f2a3b4c5d6",
    "discrepancies": [
      {
        "path": "preferences.notifications.email",
        "source_value": false,
        "target_value": true
      }
    ]
  }
]
Error: Process completed with exit code 1.

这样的反馈能让开发者在代码合并前就立即修复问题。

遗留问题与未来迭代路径

这套基于 GitHub Actions 的校验管道有效地保障了我们在开发阶段的数据一致性,但它并非银弹。在真实的大规模迁移项目中,它只是第一道防线。

首先,当前方案是在 PR 级别对样本数据进行校验,它无法捕捉到生产环境下因并发、竞争条件等导致的随机不一致。一个完备的方案需要一个并行的、持续运行的审计服务,该服务订阅生产数据库(或其只读副本)的变更数据捕获(CDC)流,例如使用 Debezium,进行近乎实时的在线比对。

其次,当前的校验脚本是全量拉取数据进行比对。当测试数据集增长时,CI 的运行时间会变得无法接受。优化策略包括只校验最近被修改的记录(基于 updated_at 时间戳),或者采用随机抽样校验。

最后,这个管道只解决了“校验”问题,而没有解决“迁移”本身。一个完整的迁移工作流还需要包含 schema 迁移工具(如 Flyway 或 Alembic for SQL,以及自定义脚本 for MongoDB)的自动化执行,并将其纳入到版本控制和 CI/CD 流程中,实现真正的基础设施即代码。当前的设计为集成这些工具打下了坚实的基础。


  目录