利用 Dask 构建从 geo-distributed CockroachDB 到 ClickHouse 的弹性数据同步架构


一个典型的技术挑战摆在面前:我们的核心业务系统构建在 CockroachDB 之上,其 geo-partitioned 特性完美支撑了全球用户的低延迟读写。然而,分析团队需要对这些遍布全球的交易数据进行近实时的复杂查询,直接在生产 OLTP 集群上运行这些查询是不可接受的,它会严重冲击在线业务的稳定性。

目标很明确:构建一个数据管道,将数据从分布式的 CockroachDB 集群高效、可靠地同步到专用的 OLAP 引擎 ClickHouse 中。这个过程必须对源数据库影响最小,具备水平扩展能力,并且能够处理跨地域数据拉取带来的网络延迟和不稳定性。

架构决策的十字路口

在技术选型上,我们评估了两种主流方案。

方案A:中心化调度脚本

这是最直接的思路。部署一个中心化的 Python 服务,使用 psycopg2 连接 CockroachDB,用 pandas 进行数据转换,然后通过 clickhouse-driver 写入 ClickHouse。

  • 优点: 实现简单,逻辑清晰,对于小规模数据或单区域部署场景,开发速度快。
  • 缺点:
    1. 单点瓶颈: 所有的网络 I/O 和计算都集中在一个节点上。当数据量增长时,该节点的内存和 CPU 会迅速成为瓶颈。
    2. 网络效率低下: 如果 CockroachDB 的数据分布在美、欧、亚三个区域,这个中心节点无论部署在哪里,都必须承受至少两份跨洋网络传输的延迟和成本。
    3. 缺乏弹性: 无法根据负载动态扩展处理能力。当数据同步任务耗时过长,会影响数据的新鲜度。
    4. 容错性差: 节点故障将导致整个同步任务中断。

在我们的 geo-distributed 场景下,方案 A 的缺点是致命的,因此被直接否决。

方案B:分布式计算框架协调

这个方案引入一个分布式计算框架来协调整个流程。数据读取、转换和写入的逻辑被分发到多个计算节点上并行执行。我们选择了 Dask,因为它与 Python 数据生态(特别是 Pandas)无缝集成,轻量级且易于部署。

最终的架构选型如下:

  • 数据源 (OLTP): CockroachDB。利用其 AS OF SYSTEM TIME 特性进行时间点一致性快照读取,避免对在线事务产生锁竞争。
  • 处理引擎: Dask。Dask 分布式调度器和工作节点(Worker)负责并行地从 CockroachDB 的不同分区拉取数据、执行转换逻辑,并将结果写入 ClickHouse。
  • 数据目的地 (OLAP): ClickHouse。利用其 MergeTree 引擎家族进行高效的数据写入和查询。
graph TD
    subgraph "源: Geo-Distributed OLTP"
        CRDB_US[CockroachDB Node - US]
        CRDB_EU[CockroachDB Node - EU]
        CRDB_AP[CockroachDB Node - AP]
    end

    subgraph "处理层: Dask Cluster"
        Dask_Scheduler[Dask Scheduler]
        Dask_Worker1[Dask Worker 1]
        Dask_Worker2[Dask Worker 2]
        Dask_WorkerN[Dask Worker N]
    end

    subgraph "目标: Centralized OLAP"
        ClickHouse[ClickHouse Cluster]
    end

    CRDB_US -- "1. 并行读取 (分区)" --> Dask_Worker1
    CRDB_EU -- "1. 并行读取 (分区)" --> Dask_Worker2
    CRDB_AP -- "1. 并行读取 (分区)" --> Dask_WorkerN

    Dask_Scheduler -- "2. 任务调度" --> Dask_Worker1
    Dask_Scheduler -- "2. 任务调度" --> Dask_Worker2
    Dask_Scheduler -- "2. 任务调度" --> Dask_WorkerN

    Dask_Worker1 -- "3. 数据转换与聚合" --> Dask_Worker1
    Dask_Worker2 -- "3. 数据转换与聚合" --> Dask_Worker2
    Dask_WorkerN -- "3. 数据转换与聚合" --> Dask_WorkerN

    Dask_Worker1 -- "4. 并行写入" --> ClickHouse
    Dask_Worker2 -- "4. 并行写入" --> ClickHouse
    Dask_WorkerN -- "4. 并行写入" --> ClickHouse

这个架构的核心优势在于,它将数据处理的压力从单一节点分散到整个 Dask 集群。Dask Worker 可以策略性地部署在靠近 CockroachDB 数据所在的区域,从而最小化跨区域数据传输。

核心实现概览

我们将通过一个具体的例子来展示这个架构的实现。假设我们有一个记录用户行为的表 user_events

1. 环境准备与配置

一个生产级的实现需要一个完整的部署脚本,这里我们用 docker-compose.yml 来模拟一个最小化的环境,包含所有必要的组件。

# docker-compose.yml
version: '3.8'

services:
  # CockroachDB a geo-distributed cluster simulation
  roach-1:
    image: cockroachdb/cockroach:v23.1.9
    command: start --insecure --join=roach-1,roach-2,roach-3 --listen-addr=roach-1:26257 --http-addr=roach-1:8080 --locality=region=us-east-1
    hostname: roach-1
    ports:
      - "26257:26257"
      - "8080:8080"
    volumes:
      - roach-data-1:/cockroach/cockroach-data

  roach-2:
    image: cockroachdb/cockroach:v23.1.9
    command: start --insecure --join=roach-1,roach-2,roach-3 --listen-addr=roach-2:26257 --http-addr=roach-2:8080 --locality=region=eu-west-1
    hostname: roach-2
    volumes:
      - roach-data-2:/cockroach/cockroach-data

  roach-3:
    image: cockroachdb/cockroach:v23.1.9
    command: start --insecure --join=roach-1,roach-2,roach-3 --listen-addr=roach-3:26257 --http-addr=roach-3:8080 --locality=region=ap-northeast-1
    hostname: roach-3
    volumes:
      - roach-data-3:/cockroach/cockroach-data

  # CockroachDB initialization
  roach-init:
    image: cockroachdb/cockroach:v23.1.9
    command: init --insecure --host=roach-1
    depends_on:
      - roach-1
      - roach-2
      - roach-3

  # ClickHouse Server
  clickhouse-server:
    image: clickhouse/clickhouse-server:23.8
    ports:
      - "8123:8123"
      - "9000:9000"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    volumes:
      - clickhouse-data:/var/lib/clickhouse

  # Dask Scheduler
  scheduler:
    image: ghcr.io/dask/dask:latest
    command: dask-scheduler
    ports:
      - "8786:8786" # Scheduler port
      - "8787:8787" # Dashboard port

  # Dask Workers
  worker:
    image: ghcr.io/dask/dask:latest
    command: dask-worker scheduler:8786
    depends_on:
      - scheduler
    # In a real setup, you would scale this service
    # `docker-compose up --scale worker=4`

volumes:
  roach-data-1:
  roach-data-2:
  roach-data-3:
  clickhouse-data:

这个配置模拟了三个区域的 CockroachDB 节点、一个 ClickHouse 服务和 Dask 集群。在生产环境中,Dask worker 会部署在物理上更靠近数据源的机器上。

2. 数据表结构设计

CockroachDB 源表:
我们设计一个分区表,让数据根据 region 字段物理地存放在对应的节点上。

-- Connect to roach-1:26257
--
-- First, define regions for the database
ALTER DATABASE defaultdb PRIMARY REGION "us-east-1";
ALTER DATABASE defaultdb ADD REGION "eu-west-1";
ALTER DATABASE defaultdb ADD REGION "ap-northeast-1";

-- Create the table with geo-partitioning
CREATE TABLE user_events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id INT,
    event_type VARCHAR(50),
    event_timestamp TIMESTAMPTZ DEFAULT now(),
    payload JSONB,
    region crdb_internal_region AS (
        CASE
            WHEN user_id % 3 = 0 THEN 'us-east-1'
            WHEN user_id % 3 = 1 THEN 'eu-west-1'
            ELSE 'ap-northeast-1'
        END
    ) STORED,
    INDEX (region)
) LOCALITY REGIONAL BY ROW;

-- Insert some sample data
INSERT INTO user_events (user_id, event_type, payload)
SELECT
    i,
    'login',
    json_build_object('ip', '192.168.1.' || (i % 255)::STRING)
FROM generate_series(1, 100000) AS s(i);

ClickHouse 目标表:
为了保证同步任务的可重入性和幂等性,我们使用 ReplacingMergeTree。它可以在插入新数据时,根据指定的版本列(这里是 event_timestamp)替换掉具有相同主键的旧数据。

-- Connect to clickhouse-server:9000
CREATE TABLE default.user_events (
    id UUID,
    user_id UInt32,
    event_type String,
    event_timestamp DateTime64(6, 'UTC'),
    payload String,
    processed_at DateTime64(6, 'UTC')
) ENGINE = ReplacingMergeTree(processed_at)
PRIMARY KEY (id)
ORDER BY (id, event_timestamp);

这里的 processed_at 字段由我们的 ETL 脚本生成,作为 ReplacingMergeTree 的版本列。

3. Dask 同步核心逻辑

这是整个方案的核心,一个 Python 脚本,它定义了 Dask 的计算图。

# sync_crdb_to_clickhouse.py
import os
import logging
import time
from datetime import datetime, timezone

import dask.dataframe as dd
from dask.distributed import Client, progress
import pandas as pd
from clickhouse_driver import Client as ClickHouseClient
import sqlalchemy as sa

# --- Configuration ---
# In production, use a more robust configuration system (e.g., Pydantic, Vault)
DASK_SCHEDULER_URL = os.getenv("DASK_SCHEDULER_URL", "tcp://localhost:8786")
CRDB_CONN_STR = os.getenv("CRDB_CONN_STR", "postgresql://root@localhost:26257/defaultdb?sslmode=disable")
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")

# --- Logging Setup ---
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# --- Core Functions ---

def get_db_engines():
    """Create and return SQLAlchemy engines for source and destination."""
    try:
        crdb_engine = sa.create_engine(CRDB_CONN_STR)
        logger.info("Successfully connected to CockroachDB.")
        
        # ClickHouse client doesn't use SQLAlchemy engine well for bulk inserts
        # So we use the native driver client
        ch_client = ClickHouseClient(host=CLICKHOUSE_HOST)
        logger.info("Successfully connected to ClickHouse.")
        
        return crdb_engine, ch_client
    except Exception as e:
        logger.error(f"Failed to connect to databases: {e}")
        raise

def get_sync_time_window(crdb_engine):
    """
    For incremental sync, we would track the last sync time.
    For this example, we'll sync the whole table using a consistent snapshot.
    """
    with crdb_engine.connect() as conn:
        # Use CockroachDB's follower reads for low-impact queries.
        # hlc_to_timestamp converts the cluster's logical time to a wall time.
        result = conn.execute(sa.text("SELECT hlc_to_timestamp(cluster_logical_timestamp())"))
        snapshot_time = result.scalar()
        logger.info(f"Using CockroachDB snapshot time: {snapshot_time}")
        return snapshot_time

def process_partition_and_write(df: pd.DataFrame, ch_host: str) -> int:
    """
    This function runs on each Dask worker.
    It takes a pandas DataFrame (a partition of the full dataset),
    performs transformations, and writes it to ClickHouse.
    """
    if df.empty:
        return 0

    # 1. Transformation
    # Add the processing timestamp required by ReplacingMergeTree
    df['processed_at'] = datetime.now(timezone.utc)
    # Ensure payload is a string for ClickHouse
    df['payload'] = df['payload'].astype(str)
    
    # Reorder columns to match ClickHouse table
    df = df[['id', 'user_id', 'event_type', 'event_timestamp', 'payload', 'processed_at']]

    # 2. Writing to ClickHouse
    # A new client is created here because this function is serialized and sent to workers.
    try:
        client = ClickHouseClient(host=ch_host)
        # Using to_records(index=False) is highly efficient
        data = list(df.to_records(index=False))
        
        # The settings dict is important for performance and handling large blocks
        settings = {'use_numpy': True}
        client.execute(
            'INSERT INTO default.user_events VALUES',
            data,
            settings=settings
        )
        return len(data)
    except Exception as e:
        # Proper error handling is critical in distributed tasks.
        logger.error(f"Failed to write partition to ClickHouse: {e}")
        # Depending on requirements, you could add retry logic or write to a dead-letter queue.
        return -1


def main():
    logger.info("Starting sync job...")
    start_time = time.time()

    dask_client = Client(DASK_SCHEDULER_URL)
    logger.info(f"Dask client connected. Dashboard at: {dask_client.dashboard_link}")
    
    crdb_engine, ch_client = get_db_engines()
    
    snapshot_time = get_sync_time_window(crdb_engine)
    
    # This is the key part for distributed reading.
    # We use `AS OF SYSTEM TIME` to get a consistent, non-blocking read.
    # The `npartitions` argument tells Dask how many parallel connections to open.
    # A good starting point for `npartitions` is the number of Dask workers.
    # In a real geo-distributed setup, we might create a separate query for each region
    # to ensure data locality during reads.
    query = f"SELECT * FROM user_events AS OF SYSTEM TIME '{snapshot_time.isoformat()}'"
    
    # We specify an index column that Dask can use to partition the query.
    # For UUIDs, this is not ideal. A better approach for huge tables
    # would be to partition on a numeric, evenly distributed column, like `user_id`.
    # For this example, `read_sql_table` with `npartitions` is illustrative.
    #
    # A more advanced partitioning strategy:
    # Instead of one read_sql_query, we could generate multiple queries
    # with WHERE clauses for each region and use dask.delayed to build the graph.
    # e.g., SELECT ... WHERE region = 'us-east-1', SELECT ... WHERE region = 'eu-west-1'
    ddf = dd.read_sql_query(
        sql=sa.text(query),
        con=crdb_engine,
        index_col="id", # This must be a unique, indexed column
        npartitions=4 # Should match or exceed number of workers for parallel reads
    )
    
    logger.info(f"Dask DataFrame created with {ddf.npartitions} partitions.")
    
    # We use `map_partitions` to apply our custom function to each partition (Pandas DataFrame)
    # The result of this is a Dask Bag/Series of completion statuses (row counts).
    results = ddf.map_partitions(
        process_partition_and_write,
        ch_host=CLICKHOUSE_HOST,
        meta=pd.Series(dtype='int')
    ).persist()

    logger.info("Computation graph submitted to Dask. Monitoring progress...")
    progress(results)
    
    processed_counts = results.compute()
    total_rows = sum(count for count in processed_counts if count > 0)
    failed_partitions = sum(1 for count in processed_counts if count < 0)

    end_time = time.time()
    
    logger.info("--- Sync Job Summary ---")
    logger.info(f"Total execution time: {end_time - start_time:.2f} seconds")
    logger.info(f"Successfully processed {total_rows} rows.")
    if failed_partitions > 0:
        logger.warning(f"{failed_partitions} partitions failed to process.")
    else:
        logger.info("All partitions processed successfully.")

if __name__ == "__main__":
    main()

架构的扩展性与局限性

这个架构虽然解决了核心问题,但在生产环境中应用时,必须清楚它的边界和下一步的演进方向。

当前方案的局限性:

  1. 微批次而非流式: 本质上这是一个高性能的微批次(micro-batch)同步方案。数据延迟取决于同步任务的执行频率。对于需要秒级延迟的场景,此方案可能不够用。
  2. Dask 调度器单点: 虽然 Dask 的计算是分布式的,但调度器(Scheduler)本身是一个单点。尽管它性能很高,但在超大规模集群(数千 worker)下,它也可能成为瓶颈。
  3. 对源表扫描: 每次同步都是对 CockroachDB 表的一次(部分或全部)扫描。尽管使用了 AS OF SYSTEM TIME 避免了锁,但对于超大规模表,这仍然会消耗 I/O。更优的方案是基于变更数据捕获(CDC)。
  4. 简单的分区策略: read_sql_querynpartitions 依赖于一个有序的索引列来进行范围切分,这对于 UUID 主键并不高效。一个更健壮的实现应该手动构建查询,例如按 regionuser_id 的哈希值范围来并行读取,以实现更好的数据本地性和读取并行度。

未来的优化路径:

  1. 引入CDC实现流式同步: 将架构演进为真正的流式处理。使用 Debezium 从 CockroachDB 的变更流中捕获数据,推送到 Kafka,然后使用 Dask 或者专门的流处理引擎(如 Flink)消费 Kafka 数据并写入 ClickHouse。这将把延迟从分钟级降低到秒级。
  2. 使用专业工作流编排器: 将 Python 脚本的执行交给 Apache Airflow 或 Prefect 等工具。这能提供更强大的调度、依赖管理、重试机制和监控告警。
  3. 动态资源调配: 将 Dask 集群部署在 Kubernetes 上,并利用 KEDA (Kubernetes-based Event-Driven Autoscaling) 等工具,根据积压的数据量或任务队列长度动态调整 Dask Worker 的数量,实现成本和性能的最佳平衡。
  4. 数据质量监控: 在 Dask 的处理流程中加入数据验证步骤(例如使用 Great Expectations),确保在写入 ClickHouse 之前数据是干净和符合预期的,并将异常数据路由到专门的错误处理流程。

  目录