一个典型的技术挑战摆在面前:我们的核心业务系统构建在 CockroachDB 之上,其 geo-partitioned 特性完美支撑了全球用户的低延迟读写。然而,分析团队需要对这些遍布全球的交易数据进行近实时的复杂查询,直接在生产 OLTP 集群上运行这些查询是不可接受的,它会严重冲击在线业务的稳定性。
目标很明确:构建一个数据管道,将数据从分布式的 CockroachDB 集群高效、可靠地同步到专用的 OLAP 引擎 ClickHouse 中。这个过程必须对源数据库影响最小,具备水平扩展能力,并且能够处理跨地域数据拉取带来的网络延迟和不稳定性。
架构决策的十字路口
在技术选型上,我们评估了两种主流方案。
方案A:中心化调度脚本
这是最直接的思路。部署一个中心化的 Python 服务,使用 psycopg2
连接 CockroachDB,用 pandas
进行数据转换,然后通过 clickhouse-driver
写入 ClickHouse。
- 优点: 实现简单,逻辑清晰,对于小规模数据或单区域部署场景,开发速度快。
- 缺点:
- 单点瓶颈: 所有的网络 I/O 和计算都集中在一个节点上。当数据量增长时,该节点的内存和 CPU 会迅速成为瓶颈。
- 网络效率低下: 如果 CockroachDB 的数据分布在美、欧、亚三个区域,这个中心节点无论部署在哪里,都必须承受至少两份跨洋网络传输的延迟和成本。
- 缺乏弹性: 无法根据负载动态扩展处理能力。当数据同步任务耗时过长,会影响数据的新鲜度。
- 容错性差: 节点故障将导致整个同步任务中断。
在我们的 geo-distributed 场景下,方案 A 的缺点是致命的,因此被直接否决。
方案B:分布式计算框架协调
这个方案引入一个分布式计算框架来协调整个流程。数据读取、转换和写入的逻辑被分发到多个计算节点上并行执行。我们选择了 Dask,因为它与 Python 数据生态(特别是 Pandas)无缝集成,轻量级且易于部署。
最终的架构选型如下:
- 数据源 (OLTP): CockroachDB。利用其
AS OF SYSTEM TIME
特性进行时间点一致性快照读取,避免对在线事务产生锁竞争。 - 处理引擎: Dask。Dask 分布式调度器和工作节点(Worker)负责并行地从 CockroachDB 的不同分区拉取数据、执行转换逻辑,并将结果写入 ClickHouse。
- 数据目的地 (OLAP): ClickHouse。利用其
MergeTree
引擎家族进行高效的数据写入和查询。
graph TD subgraph "源: Geo-Distributed OLTP" CRDB_US[CockroachDB Node - US] CRDB_EU[CockroachDB Node - EU] CRDB_AP[CockroachDB Node - AP] end subgraph "处理层: Dask Cluster" Dask_Scheduler[Dask Scheduler] Dask_Worker1[Dask Worker 1] Dask_Worker2[Dask Worker 2] Dask_WorkerN[Dask Worker N] end subgraph "目标: Centralized OLAP" ClickHouse[ClickHouse Cluster] end CRDB_US -- "1. 并行读取 (分区)" --> Dask_Worker1 CRDB_EU -- "1. 并行读取 (分区)" --> Dask_Worker2 CRDB_AP -- "1. 并行读取 (分区)" --> Dask_WorkerN Dask_Scheduler -- "2. 任务调度" --> Dask_Worker1 Dask_Scheduler -- "2. 任务调度" --> Dask_Worker2 Dask_Scheduler -- "2. 任务调度" --> Dask_WorkerN Dask_Worker1 -- "3. 数据转换与聚合" --> Dask_Worker1 Dask_Worker2 -- "3. 数据转换与聚合" --> Dask_Worker2 Dask_WorkerN -- "3. 数据转换与聚合" --> Dask_WorkerN Dask_Worker1 -- "4. 并行写入" --> ClickHouse Dask_Worker2 -- "4. 并行写入" --> ClickHouse Dask_WorkerN -- "4. 并行写入" --> ClickHouse
这个架构的核心优势在于,它将数据处理的压力从单一节点分散到整个 Dask 集群。Dask Worker 可以策略性地部署在靠近 CockroachDB 数据所在的区域,从而最小化跨区域数据传输。
核心实现概览
我们将通过一个具体的例子来展示这个架构的实现。假设我们有一个记录用户行为的表 user_events
。
1. 环境准备与配置
一个生产级的实现需要一个完整的部署脚本,这里我们用 docker-compose.yml
来模拟一个最小化的环境,包含所有必要的组件。
# docker-compose.yml
version: '3.8'
services:
# CockroachDB a geo-distributed cluster simulation
roach-1:
image: cockroachdb/cockroach:v23.1.9
command: start --insecure --join=roach-1,roach-2,roach-3 --listen-addr=roach-1:26257 --http-addr=roach-1:8080 --locality=region=us-east-1
hostname: roach-1
ports:
- "26257:26257"
- "8080:8080"
volumes:
- roach-data-1:/cockroach/cockroach-data
roach-2:
image: cockroachdb/cockroach:v23.1.9
command: start --insecure --join=roach-1,roach-2,roach-3 --listen-addr=roach-2:26257 --http-addr=roach-2:8080 --locality=region=eu-west-1
hostname: roach-2
volumes:
- roach-data-2:/cockroach/cockroach-data
roach-3:
image: cockroachdb/cockroach:v23.1.9
command: start --insecure --join=roach-1,roach-2,roach-3 --listen-addr=roach-3:26257 --http-addr=roach-3:8080 --locality=region=ap-northeast-1
hostname: roach-3
volumes:
- roach-data-3:/cockroach/cockroach-data
# CockroachDB initialization
roach-init:
image: cockroachdb/cockroach:v23.1.9
command: init --insecure --host=roach-1
depends_on:
- roach-1
- roach-2
- roach-3
# ClickHouse Server
clickhouse-server:
image: clickhouse/clickhouse-server:23.8
ports:
- "8123:8123"
- "9000:9000"
ulimits:
nproc: 65535
nofile:
soft: 262144
hard: 262144
volumes:
- clickhouse-data:/var/lib/clickhouse
# Dask Scheduler
scheduler:
image: ghcr.io/dask/dask:latest
command: dask-scheduler
ports:
- "8786:8786" # Scheduler port
- "8787:8787" # Dashboard port
# Dask Workers
worker:
image: ghcr.io/dask/dask:latest
command: dask-worker scheduler:8786
depends_on:
- scheduler
# In a real setup, you would scale this service
# `docker-compose up --scale worker=4`
volumes:
roach-data-1:
roach-data-2:
roach-data-3:
clickhouse-data:
这个配置模拟了三个区域的 CockroachDB 节点、一个 ClickHouse 服务和 Dask 集群。在生产环境中,Dask worker 会部署在物理上更靠近数据源的机器上。
2. 数据表结构设计
CockroachDB 源表:
我们设计一个分区表,让数据根据 region
字段物理地存放在对应的节点上。
-- Connect to roach-1:26257
--
-- First, define regions for the database
ALTER DATABASE defaultdb PRIMARY REGION "us-east-1";
ALTER DATABASE defaultdb ADD REGION "eu-west-1";
ALTER DATABASE defaultdb ADD REGION "ap-northeast-1";
-- Create the table with geo-partitioning
CREATE TABLE user_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id INT,
event_type VARCHAR(50),
event_timestamp TIMESTAMPTZ DEFAULT now(),
payload JSONB,
region crdb_internal_region AS (
CASE
WHEN user_id % 3 = 0 THEN 'us-east-1'
WHEN user_id % 3 = 1 THEN 'eu-west-1'
ELSE 'ap-northeast-1'
END
) STORED,
INDEX (region)
) LOCALITY REGIONAL BY ROW;
-- Insert some sample data
INSERT INTO user_events (user_id, event_type, payload)
SELECT
i,
'login',
json_build_object('ip', '192.168.1.' || (i % 255)::STRING)
FROM generate_series(1, 100000) AS s(i);
ClickHouse 目标表:
为了保证同步任务的可重入性和幂等性,我们使用 ReplacingMergeTree
。它可以在插入新数据时,根据指定的版本列(这里是 event_timestamp
)替换掉具有相同主键的旧数据。
-- Connect to clickhouse-server:9000
CREATE TABLE default.user_events (
id UUID,
user_id UInt32,
event_type String,
event_timestamp DateTime64(6, 'UTC'),
payload String,
processed_at DateTime64(6, 'UTC')
) ENGINE = ReplacingMergeTree(processed_at)
PRIMARY KEY (id)
ORDER BY (id, event_timestamp);
这里的 processed_at
字段由我们的 ETL 脚本生成,作为 ReplacingMergeTree
的版本列。
3. Dask 同步核心逻辑
这是整个方案的核心,一个 Python 脚本,它定义了 Dask 的计算图。
# sync_crdb_to_clickhouse.py
import os
import logging
import time
from datetime import datetime, timezone
import dask.dataframe as dd
from dask.distributed import Client, progress
import pandas as pd
from clickhouse_driver import Client as ClickHouseClient
import sqlalchemy as sa
# --- Configuration ---
# In production, use a more robust configuration system (e.g., Pydantic, Vault)
DASK_SCHEDULER_URL = os.getenv("DASK_SCHEDULER_URL", "tcp://localhost:8786")
CRDB_CONN_STR = os.getenv("CRDB_CONN_STR", "postgresql://root@localhost:26257/defaultdb?sslmode=disable")
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- Core Functions ---
def get_db_engines():
"""Create and return SQLAlchemy engines for source and destination."""
try:
crdb_engine = sa.create_engine(CRDB_CONN_STR)
logger.info("Successfully connected to CockroachDB.")
# ClickHouse client doesn't use SQLAlchemy engine well for bulk inserts
# So we use the native driver client
ch_client = ClickHouseClient(host=CLICKHOUSE_HOST)
logger.info("Successfully connected to ClickHouse.")
return crdb_engine, ch_client
except Exception as e:
logger.error(f"Failed to connect to databases: {e}")
raise
def get_sync_time_window(crdb_engine):
"""
For incremental sync, we would track the last sync time.
For this example, we'll sync the whole table using a consistent snapshot.
"""
with crdb_engine.connect() as conn:
# Use CockroachDB's follower reads for low-impact queries.
# hlc_to_timestamp converts the cluster's logical time to a wall time.
result = conn.execute(sa.text("SELECT hlc_to_timestamp(cluster_logical_timestamp())"))
snapshot_time = result.scalar()
logger.info(f"Using CockroachDB snapshot time: {snapshot_time}")
return snapshot_time
def process_partition_and_write(df: pd.DataFrame, ch_host: str) -> int:
"""
This function runs on each Dask worker.
It takes a pandas DataFrame (a partition of the full dataset),
performs transformations, and writes it to ClickHouse.
"""
if df.empty:
return 0
# 1. Transformation
# Add the processing timestamp required by ReplacingMergeTree
df['processed_at'] = datetime.now(timezone.utc)
# Ensure payload is a string for ClickHouse
df['payload'] = df['payload'].astype(str)
# Reorder columns to match ClickHouse table
df = df[['id', 'user_id', 'event_type', 'event_timestamp', 'payload', 'processed_at']]
# 2. Writing to ClickHouse
# A new client is created here because this function is serialized and sent to workers.
try:
client = ClickHouseClient(host=ch_host)
# Using to_records(index=False) is highly efficient
data = list(df.to_records(index=False))
# The settings dict is important for performance and handling large blocks
settings = {'use_numpy': True}
client.execute(
'INSERT INTO default.user_events VALUES',
data,
settings=settings
)
return len(data)
except Exception as e:
# Proper error handling is critical in distributed tasks.
logger.error(f"Failed to write partition to ClickHouse: {e}")
# Depending on requirements, you could add retry logic or write to a dead-letter queue.
return -1
def main():
logger.info("Starting sync job...")
start_time = time.time()
dask_client = Client(DASK_SCHEDULER_URL)
logger.info(f"Dask client connected. Dashboard at: {dask_client.dashboard_link}")
crdb_engine, ch_client = get_db_engines()
snapshot_time = get_sync_time_window(crdb_engine)
# This is the key part for distributed reading.
# We use `AS OF SYSTEM TIME` to get a consistent, non-blocking read.
# The `npartitions` argument tells Dask how many parallel connections to open.
# A good starting point for `npartitions` is the number of Dask workers.
# In a real geo-distributed setup, we might create a separate query for each region
# to ensure data locality during reads.
query = f"SELECT * FROM user_events AS OF SYSTEM TIME '{snapshot_time.isoformat()}'"
# We specify an index column that Dask can use to partition the query.
# For UUIDs, this is not ideal. A better approach for huge tables
# would be to partition on a numeric, evenly distributed column, like `user_id`.
# For this example, `read_sql_table` with `npartitions` is illustrative.
#
# A more advanced partitioning strategy:
# Instead of one read_sql_query, we could generate multiple queries
# with WHERE clauses for each region and use dask.delayed to build the graph.
# e.g., SELECT ... WHERE region = 'us-east-1', SELECT ... WHERE region = 'eu-west-1'
ddf = dd.read_sql_query(
sql=sa.text(query),
con=crdb_engine,
index_col="id", # This must be a unique, indexed column
npartitions=4 # Should match or exceed number of workers for parallel reads
)
logger.info(f"Dask DataFrame created with {ddf.npartitions} partitions.")
# We use `map_partitions` to apply our custom function to each partition (Pandas DataFrame)
# The result of this is a Dask Bag/Series of completion statuses (row counts).
results = ddf.map_partitions(
process_partition_and_write,
ch_host=CLICKHOUSE_HOST,
meta=pd.Series(dtype='int')
).persist()
logger.info("Computation graph submitted to Dask. Monitoring progress...")
progress(results)
processed_counts = results.compute()
total_rows = sum(count for count in processed_counts if count > 0)
failed_partitions = sum(1 for count in processed_counts if count < 0)
end_time = time.time()
logger.info("--- Sync Job Summary ---")
logger.info(f"Total execution time: {end_time - start_time:.2f} seconds")
logger.info(f"Successfully processed {total_rows} rows.")
if failed_partitions > 0:
logger.warning(f"{failed_partitions} partitions failed to process.")
else:
logger.info("All partitions processed successfully.")
if __name__ == "__main__":
main()
架构的扩展性与局限性
这个架构虽然解决了核心问题,但在生产环境中应用时,必须清楚它的边界和下一步的演进方向。
当前方案的局限性:
- 微批次而非流式: 本质上这是一个高性能的微批次(micro-batch)同步方案。数据延迟取决于同步任务的执行频率。对于需要秒级延迟的场景,此方案可能不够用。
- Dask 调度器单点: 虽然 Dask 的计算是分布式的,但调度器(Scheduler)本身是一个单点。尽管它性能很高,但在超大规模集群(数千 worker)下,它也可能成为瓶颈。
- 对源表扫描: 每次同步都是对 CockroachDB 表的一次(部分或全部)扫描。尽管使用了
AS OF SYSTEM TIME
避免了锁,但对于超大规模表,这仍然会消耗 I/O。更优的方案是基于变更数据捕获(CDC)。 - 简单的分区策略:
read_sql_query
的npartitions
依赖于一个有序的索引列来进行范围切分,这对于 UUID 主键并不高效。一个更健壮的实现应该手动构建查询,例如按region
或user_id
的哈希值范围来并行读取,以实现更好的数据本地性和读取并行度。
未来的优化路径:
- 引入CDC实现流式同步: 将架构演进为真正的流式处理。使用 Debezium 从 CockroachDB 的变更流中捕获数据,推送到 Kafka,然后使用 Dask 或者专门的流处理引擎(如 Flink)消费 Kafka 数据并写入 ClickHouse。这将把延迟从分钟级降低到秒级。
- 使用专业工作流编排器: 将 Python 脚本的执行交给 Apache Airflow 或 Prefect 等工具。这能提供更强大的调度、依赖管理、重试机制和监控告警。
- 动态资源调配: 将 Dask 集群部署在 Kubernetes 上,并利用 KEDA (Kubernetes-based Event-Driven Autoscaling) 等工具,根据积压的数据量或任务队列长度动态调整 Dask Worker 的数量,实现成本和性能的最佳平衡。
- 数据质量监控: 在 Dask 的处理流程中加入数据验证步骤(例如使用 Great Expectations),确保在写入 ClickHouse 之前数据是干净和符合预期的,并将异常数据路由到专门的错误处理流程。