我们的用户中心服务,其核心数据模型一直存在于一个 PostgreSQL 数据库的 users 和 user_profiles 表中。随着业务迭代,profile 表的字段变得越来越臃肿,频繁的 ALTER TABLE 操作不仅在生产环境风险极高,也拖慢了开发节奏。更糟糕的是,为了支持一些动态标签和配置,我们妥协性地引入了 jsonb 字段,这本质上是将非结构化数据硬塞进了关系模型,查询和索引的复杂度直线上升。
技术债积累到一定程度,重构势在必行。团队决定将用户配置、标签这类高度动态化的数据迁移到文档型数据库 MongoDB 中,利用其灵活的 Schema 来解决问题。但对于一个 24/7 运行的核心服务,停机迁移是不可接受的方案。唯一的路径是采用“绞杀者模式”(Strangler Fig Pattern),在一段时间内,应用层进行双写:一份写入旧的 PostgreSQL,一份写入新的 MongoDB。
双写引入了新的、更严峻的挑战:如何确保两个异构数据源之间的数据一致性?任何代码逻辑的缺陷、网络抖动或数据库特性的差异都可能导致数据漂移。手动比对数据无异于大海捞针。我们需要一个自动化的、可靠的、集成在开发流程中的校验机制。这个机制必须在代码变更合并到主干之前,就能精准地发现潜在的一致性问题。
最终,我们选择使用 GitHub Actions 来构建这套自动化校验管道。它与我们的代码仓库深度集成,能够在每次 Pull Request 中触发,利用容器技术动态创建隔离的数据库环境,执行深度数据比对,并将结果直接反馈到 PR 状态检查中。这不仅仅是一个 CI 脚本,它是我们迁移过程中保障数据质量的核心防线。
架构设计与校验流程
在深入代码之前,我们先明确整个校验流水线的架构和数据流。当开发者提交一个涉及用户数据读写逻辑的 Pull Request 时,GitHub Actions 会被触发,执行以下流程:
graph TD
subgraph GitHub Actions Runner
A[PR Triggered] --> B{Setup Environment};
B --> C[Start PostgreSQL Container];
B --> D[Start MongoDB Container];
C --> E{Seed Databases};
D --> E;
E --> F[Run Consistency Verification Script];
F --> G{Report Results};
end
subgraph Verification Logic
F --> F1[Connect to PG & Mongo];
F1 --> F2[Fetch Batch from PostgreSQL];
F2 --> F3[Fetch Corresponding Docs from MongoDB];
F3 --> F4[Perform Deep Comparison];
F4 --> F5{Mismatch?};
F5 -- Yes --> F6[Log Detailed Diff & Exit 1];
F5 -- No --> F7[Continue to Next Batch];
F7 --> F2;
F7 -- All Batches Done --> F8[Log Success & Exit 0];
end
G -- Failure --> H[Fail PR Check];
G -- Success --> I[Pass PR Check];
A -- "on: pull_request" --> F
这个流程的核心在于,它创建了一个与生产环境隔离但逻辑上一致的微型“沙盒”。在这个沙盒里,我们可以安全地模拟双写后的状态,并运行高强度的校验程序,而无需担心对任何现有环境造成影响。
GitHub Actions 工作流配置
一切始于 .github/workflows/data-consistency-check.yml 文件。这份 YAML 文件定义了整个 CI 作业的生命周期。在真实项目中,配置会更复杂,但这里的骨架是生产级的。
# .github/workflows/data-consistency-check.yml
name: Dual-Write Data Consistency Check
on:
pull_request:
paths:
# 仅当影响核心数据模型或双写逻辑的代码变更时触发
- 'src/models/**'
- 'src/services/user_profile_writer.js'
- 'scripts/verify_consistency.py'
- '.github/workflows/data-consistency-check.yml'
branches:
- main
jobs:
verify-data-consistency:
runs-on: ubuntu-latest
# 使用服务容器来启动依赖的数据库
# 这是 GitHub Actions 的强大功能,可以为作业提供临时的、网络隔离的服务
services:
postgres:
image: postgres:14.5
# 使用健康检查确保数据库完全启动后再开始作业步骤
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
POSTGRES_USER: user
POSTGRES_PASSWORD: ${{ secrets.TEST_DB_PASSWORD }}
POSTGRES_DB: test_db
ports:
- 5432:5432
mongodb:
image: mongo:6.0
env:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: ${{ secrets.TEST_DB_PASSWORD }}
ports:
- 27017:27017
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
cache: 'pip'
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install -r scripts/requirements.txt
- name: Wait for databases to be ready
# 即使有健康检查,有时也需要短暂等待网络和应用层就绪
run: sleep 15
- name: Run Data Seeding and Consistency Verification
# 将数据库凭证通过环境变量安全地传递给脚本
# 这里使用 localhost 是因为服务容器和作业运行在同一个 Docker 网络中
env:
PG_HOST: localhost
PG_PORT: 5432
PG_USER: user
PG_PASSWORD: ${{ secrets.TEST_DB_PASSWORD }}
PG_DATABASE: test_db
MONGO_HOST: localhost
MONGO_PORT: 27017
MONGO_USER: root
MONGO_PASSWORD: ${{ secrets.TEST_DB_PASSWORD }}
run: python scripts/verify_consistency.py
这里的关键点:
- 触发路径 (
paths): 我们不希望每次提交都运行这个昂贵的作业。它只应该在可能影响数据一致性的核心代码被修改时运行。 - 服务容器 (
services): 这是实现环境隔离的核心。我们启动了指定版本的 PostgreSQL 和 MongoDB 容器。GitHub Actions 会处理网络配置,让作业步骤可以通过localhost直接访问这些服务。 - 健康检查 (
options): 对于数据库这类启动需要时间的服务,健康检查至关重要。它能确保后续步骤运行时,数据库已经准备好接受连接。 - 密钥管理 (
secrets): 密码等敏感信息绝不能硬编码。我们使用 GitHub 的加密密钥secrets.TEST_DB_PASSWORD}来安全地存储和使用它们。
数据模型与校验脚本
工作流已经就绪,现在来看它执行的核心——verify_consistency.py 脚本。这个脚本负责三件事:连接数据库、植入测试数据、以及执行核心的比对逻辑。
1. 数据库 Schema 定义
首先,我们需要一个有代表性的 PostgreSQL 表结构。
-- a simplified user table for demonstration
CREATE TABLE users (
id UUID PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- the profile table we are migrating
CREATE TABLE user_profiles (
user_id UUID PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
full_name VARCHAR(255),
bio TEXT,
last_login TIMESTAMPTZ,
-- This is the problematic column we want to get rid of
settings JSONB
);
对应的 MongoDB 文档结构可能如下。注意,settings 被扁平化了,并且字段名也可能为了更清晰而做了调整(例如 _id 对应 user_id)。
{
"_id": "uuid-goes-here", // Corresponds to user_id
"email": "[email protected]",
"fullName": "Test User",
"biography": "A sample bio.",
"lastLogin": ISODate("..."),
"preferences": {
"theme": "dark",
"notifications": {
"push": true,
"email": false
}
},
"createdAt": ISODate("..."),
"updatedAt": ISODate("...")
}
2. 核心校验脚本
这是整个方案的基石。一个常见的错误是直接进行字典比较,这在异构数据库之间是行不通的。数据类型(如 TIMESTAMPTZ vs ISODate)、浮点数精度、甚至是 None/null 的表示都可能存在差异。一个健壮的比较器必须能够处理这些细微差别。
# scripts/verify_consistency.py
import os
import sys
import uuid
import json
from datetime import datetime, timezone
from decimal import Decimal
import psycopg2
from psycopg2.extras import RealDictCursor
from pymongo import MongoClient
# --- Configuration from Environment Variables ---
PG_CONFIG = {
"host": os.environ.get("PG_HOST"),
"port": os.environ.get("PG_PORT"),
"user": os.environ.get("PG_USER"),
"password": os.environ.get("PG_PASSWORD"),
"dbname": os.environ.get("PG_DATABASE"),
}
MONGO_CONFIG = {
"host": os.environ.get("MONGO_HOST"),
"port": int(os.environ.get("MONGO_PORT")),
"username": os.environ.get("MONGO_USER"),
"password": os.environ.get("MONGO_PASSWORD"),
}
MONGO_DB_NAME = "user_profiles_db"
MONGO_COLLECTION_NAME = "profiles"
def setup_databases(pg_conn, mongo_client):
"""
Creates schemas and seeds test data. In a real project,
this would be more complex, possibly involving a dedicated seeding script.
"""
print("--- Setting up database schemas and seeding data ---")
# ... (SQL CREATE TABLE statements executed here) ...
# ... (Seeding logic to insert sample data into both PG and Mongo) ...
# This part is crucial to ensure the test starts with a known state.
# For brevity, the full seeding logic is omitted, but it would create
# a set of users with varying profiles to test different data types and structures.
print("--- Database setup complete ---")
def normalize_value(value):
"""
Recursively normalize values for comparison.
- Converts datetimes to timezone-aware UTC strings.
- Converts Decimals to floats.
- Handles nested dicts and lists.
"""
if isinstance(value, datetime):
return value.astimezone(timezone.utc).isoformat()
if isinstance(value, Decimal):
return float(value)
if isinstance(value, uuid.UUID):
return str(value)
if isinstance(value, dict):
return {k: normalize_value(v) for k, v in value.items()}
if isinstance(value, list):
return [normalize_value(v) for v in value]
return value
def deep_compare(obj1, obj2, path=""):
"""
Performs a deep, type-aware comparison of two dictionary-like objects.
Returns a list of discrepancies.
"""
discrepancies = []
keys1 = set(obj1.keys())
keys2 = set(obj2.keys())
common_keys = keys1.intersection(keys2)
for key in common_keys:
val1 = normalize_value(obj1[key])
val2 = normalize_value(obj2[key])
current_path = f"{path}.{key}" if path else key
if isinstance(val1, dict) and isinstance(val2, dict):
discrepancies.extend(deep_compare(val1, val2, path=current_path))
elif val1 != val2:
discrepancy = {
"path": current_path,
"source_value": val1,
"target_value": val2
}
discrepancies.append(discrepancy)
added_keys = keys2 - keys1
if added_keys:
discrepancies.append({"path": path, "keys_added_in_target": list(added_keys)})
removed_keys = keys1 - keys2
if removed_keys:
discrepancies.append({"path": path, "keys_removed_in_target": list(removed_keys)})
return discrepancies
def run_verification(pg_conn, mongo_collection):
"""
The main verification logic.
"""
print("\n--- Starting data consistency verification ---")
mismatched_records = []
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
# In a real scenario, you'd fetch in batches.
cursor.execute("""
SELECT
u.id,
u.email,
u.created_at,
up.full_name,
up.bio,
up.last_login,
up.settings
FROM users u
JOIN user_profiles up ON u.id = up.user_id
ORDER BY u.created_at;
""")
pg_records = cursor.fetchall()
print(f"Fetched {len(pg_records)} records from PostgreSQL.")
for pg_record in pg_records:
user_id = str(pg_record['id'])
mongo_doc = mongo_collection.find_one({"_id": user_id})
if not mongo_doc:
mismatched_records.append({"user_id": user_id, "error": "Document not found in MongoDB"})
continue
# --- Data Transformation: Mapping SQL record to expected Mongo document structure ---
# This is a critical step. The raw SQL result often needs reshaping to match
# the target document structure.
expected_doc = {
"_id": user_id,
"email": pg_record['email'],
"fullName": pg_record['full_name'],
"biography": pg_record['bio'],
"lastLogin": pg_record['last_login'],
"preferences": pg_record['settings'], # Assuming settings jsonb maps directly
"createdAt": pg_record['created_at'],
# updatedAt is missing in this query, a potential source of discrepancy
}
# The 'updatedAt' field is not selected from PG, we must remove it from the Mongo doc
# before comparison, or handle it explicitly. This is a common pitfall.
if 'updatedAt' in mongo_doc:
del mongo_doc['updatedAt']
discrepancies = deep_compare(expected_doc, mongo_doc)
if discrepancies:
mismatched_records.append({
"user_id": user_id,
"discrepancies": discrepancies
})
return mismatched_records
def main():
try:
pg_conn = psycopg2.connect(**PG_CONFIG)
mongo_client = MongoClient(**MONGO_CONFIG)
mongo_db = mongo_client[MONGO_DB_NAME]
mongo_collection = mongo_db[MONGO_COLLECTION_NAME]
print("Successfully connected to databases.")
except Exception as e:
print(f"Error connecting to databases: {e}", file=sys.stderr)
sys.exit(1)
# setup_databases(pg_conn, mongo_client) # This would be called to seed data
mismatches = run_verification(pg_conn, mongo_collection)
pg_conn.close()
mongo_client.close()
if mismatches:
print("\n--- DATA CONSISTENCY CHECK FAILED ---", file=sys.stderr)
print(f"Found {len(mismatches)} mismatched records:", file=sys.stderr)
# Pretty print the JSON for better readability in logs
print(json.dumps(mismatches, indent=2, default=str), file=sys.stderr)
sys.exit(1)
else:
print("\n--- DATA CONSISTENCY CHECK PASSED ---")
sys.exit(0)
if __name__ == "__main__":
main()
这个脚本的精髓在于 deep_compare 和 normalize_value 函数。它没有天真地假设数据类型会完全一致,而是先将所有待比较的值“规范化”到一个共同的表示形式(例如,所有时间都转为带时区的 ISO 8601 字符串),然后再进行比较。它还能递归地深入嵌套的字典,并明确报告出是值不匹配、目标端多了字段,还是少了字段。这种详尽的报告对于在 CI 日志中快速定位问题至关重要。
当 PR 中引入了一个 bug,比如错误地将 settings.notifications.email 写入了 true,CI 的输出会非常清晰:
--- DATA CONSISTENCY CHECK FAILED ---
Found 1 mismatched records:
[
{
"user_id": "a1b2c3d4-e5f6-a7b8-c9d0-e1f2a3b4c5d6",
"discrepancies": [
{
"path": "preferences.notifications.email",
"source_value": false,
"target_value": true
}
]
}
]
Error: Process completed with exit code 1.
这样的反馈能让开发者在代码合并前就立即修复问题。
遗留问题与未来迭代路径
这套基于 GitHub Actions 的校验管道有效地保障了我们在开发阶段的数据一致性,但它并非银弹。在真实的大规模迁移项目中,它只是第一道防线。
首先,当前方案是在 PR 级别对样本数据进行校验,它无法捕捉到生产环境下因并发、竞争条件等导致的随机不一致。一个完备的方案需要一个并行的、持续运行的审计服务,该服务订阅生产数据库(或其只读副本)的变更数据捕获(CDC)流,例如使用 Debezium,进行近乎实时的在线比对。
其次,当前的校验脚本是全量拉取数据进行比对。当测试数据集增长时,CI 的运行时间会变得无法接受。优化策略包括只校验最近被修改的记录(基于 updated_at 时间戳),或者采用随机抽样校验。
最后,这个管道只解决了“校验”问题,而没有解决“迁移”本身。一个完整的迁移工作流还需要包含 schema 迁移工具(如 Flyway 或 Alembic for SQL,以及自定义脚本 for MongoDB)的自动化执行,并将其纳入到版本控制和 CI/CD 流程中,实现真正的基础设施即代码。当前的设计为集成这些工具打下了坚实的基础。