基于CDC实现单一数据源到ClickHouse与Solr的近实时双写架构权衡


在构建复杂的业务系统时,一个常见的技术挑战是数据需要以多种形式被消费。例如,核心业务数据存储在PostgreSQL或MySQL这类关系型数据库中,以保证事务的ACID特性;同时,运营和分析团队需要对这些数据进行高性能的即席SQL查询,这正是ClickHouse等OLAP引擎的用武之地;而面向用户的产品功能则可能需要强大的全文检索、分面、排序能力,这又是Solr或Elasticsearch这类搜索引擎的专长。

问题随之而来:如何维护这三者之间的数据一致性?

方案A:应用层双写——脆弱的捷径

最初步的构想往往是在业务代码中执行同步双写(或者说“三写”)。当一个写操作,比如创建一个新商品,发生时,业务逻辑会依次执行:

  1. 在PostgreSQL中开启事务。
  2. INSERT 数据到 products 表。
  3. 向ClickHouse的 products 表发起 INSERT 请求。
  4. 向Solr的 products 核心发送 add 命令。
  5. 提交PostgreSQL事务。

如果其中任何一步失败,则尝试回滚所有操作。这种方案在原型阶段看起来似乎可行,但在真实项目中,它是一条通往灾难的捷径。

这里的核心问题在于紧耦合责任错配

  • 性能瓶颈: 整个业务操作的响应时间取决于最慢的那个组件。Solr的索引合并、ClickHouse的后台Merge,或是网络抖动都可能拖慢核心的数据库事务,直接影响用户体验。
  • 可用性黑洞: 任何一个下游系统的临时不可用(例如Solr正在进行leader选举,或ClickHouse节点重启),都会导致核心业务的写入失败。整个系统的可用性变成了 min(PostgreSQL, ClickHouse, Solr),这在分布式环境中是不可接受的。
  • 数据一致性幻觉: 除非引入重量级的两阶段提交(2PC/XA)协议,否则无法保证原子性。在分布式环境下,这会带来巨大的性能开销和复杂性。如果没有2PC,一旦在提交PostgreSQL事务后,写入Solr失败,数据就会永久处于不一致状态。补偿逻辑(retries, reconciliation jobs)会迅速让代码变得复杂不堪。
  • 维护噩梦: 业务代码与基础设施细节(ClickHouse的连接池、Solr的Zookeeper地址等)高度耦合。每次下游系统变更或升级,都可能需要修改并重新部署核心业务服务。

在生产环境中,应用层双写是一种典型的反模式。它将基础设施的复杂性泄露到了应用层,违反了单一职责原则。

方案B:基于CDC的异步管道——解耦与韧性

一个更健壮的架构是承认主数据源的权威地位,并将数据同步的责任下沉到基础设施层。这就是基于变更数据捕获(Change Data Capture, CDC)的方案。

其核心思想是:应用服务只与主数据库(例如PostgreSQL)交互。所有的数据变更都会被记录在数据库的事务日志中(如PostgreSQL的WAL)。一个专门的CDC工具(如Debezium)会伪装成一个从库,读取这些日志,将行级别的 INSERT, UPDATE, DELETE 操作解析成结构化的事件,然后将这些事件发布到高吞吐、持久化的消息队列(如Apache Kafka)中。

一旦数据进入Kafka,就成为了可供任意下游消费的“事实之源”。我们可以为ClickHouse和Solr分别部署独立的消费者,它们订阅同一个Kafka topic,并各自负责将数据变更应用到自己的存储中。

graph TD
    subgraph "业务应用层"
        AppService[Application Service]
    end

    subgraph "主数据源"
        Postgres[(PostgreSQL)]
    end

    subgraph "CDC与消息总线"
        Debezium[Debezium Connector]
        Kafka[Apache Kafka Topic: pg.public.products]
    end

    subgraph "下游消费与存储"
        SolrSink[Kafka Connect Solr Sink]
        ClickHouseSink[Kafka Connect JDBC Sink]
        Solr[(Solr Collection)]
        ClickHouse[(ClickHouse Table)]
    end

    AppService -- "INSERT/UPDATE/DELETE" --> Postgres
    Postgres -- "Reads WAL Log" --> Debezium
    Debezium -- "Publishes Change Events" --> Kafka
    Kafka --> SolrSink
    Kafka --> ClickHouseSink
    SolrSink -- "Writes Documents" --> Solr
    ClickHouseSink -- "Writes Rows" --> ClickHouse

这个架构的优势是显而易见的:

  • 解耦: 应用服务彻底摆脱了对ClickHouse和Solr的感知。它的唯一职责是保证主数据库的事务正确性。
  • 韧性: Kafka作为中间缓冲层至关重要。如果Solr集群暂时不可用,数据会积压在Kafka topic中,待Solr恢复后,消费者会自动追赶进度,不会丢失任何变更。主业务的写入完全不受影响。
  • 可扩展性: 未来如果需要将数据同步到第四个系统(比如一个Redis缓存),我们只需要新增一个消费者组即可,对现有系统零侵入。
  • 性能: 核心事务的延迟只与主数据库的性能有关,同步到下游系统的过程是异步的,对用户无感知。

当然,这种方案的主要权衡是最终一致性。从数据在PostgreSQL中提交,到它在ClickHouse和Solr中可被查询,会存在一个可度量的延迟(通常在秒级)。对于大多数分析和搜索场景,这种延迟是完全可以接受的。

最终选择与理由

对于任何需要长期演进和维护的系统,方案B(CDC异步管道)是压倒性的胜利。它用可控的最终一致性换来了系统间的高度解耦、卓越的韧性和水平扩展能力。初期增加的部署复杂性(需要维护Kafka和Kafka Connect集群)在系统规模扩大后,会通过降低维护成本和提高系统稳定性得到百倍的回报。

在真实项目中,选择CDC方案意味着将数据同步从一个“应用逻辑问题”转变为一个“数据工程问题”,这是一个正确的职责划分。

核心实现概览

我们将以一个简化的products表为例,展示如何搭建这套管道。

1. 源数据库 (PostgreSQL)

假设我们有如下的products表:

-- 确保开启逻辑复制
-- 在 postgresql.conf 中设置:
-- wal_level = logical
-- max_wal_senders = 4
-- max_replication_slots = 4

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price NUMERIC(10, 2) NOT NULL,
    category VARCHAR(100),
    tags VARCHAR(255)[],
    last_updated_at TIMESTAMPTZ DEFAULT now()
);

-- 创建角色用于复制
CREATE ROLE debezium_user REPLICATION LOGIN PASSWORD 'password';
GRANT SELECT ON products TO debezium_user;

2. Debezium Connector for PostgreSQL

在Kafka Connect集群中,我们部署Debezium连接器来捕获products表的变更。

// debezium-postgres-source.json
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres-host",
        "database.port": "5432",
        "database.user": "debezium_user",
        "database.password": "password",
        "database.dbname": "inventory_db",
        "database.server.name": "inventory_postgres",
        "table.include.list": "public.products",
        "publication.autocreate.mode": "filtered", // 自动创建publication
        "plugin.name": "pgoutput",

        // Kafka 相关配置
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",

        // 错误处理:发送到死信队列
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "errors.tolerance": "all",
        "errors.deadletterqueue.topic.name": "dlq_inventory_topic",
        "errors.deadletterqueue.topic.replication.factor": "3",
        "errors.deadletterqueue.context.headers.enable": "true"
    }
}

部署后,products表的任何变更都会产生一条类似下面结构的JSON消息,并发送到名为 inventory_postgres.public.products 的Kafka topic中。

// Debezium 消息结构示例 (update操作)
{
  "before": { /* ...旧的行数据... */ },
  "after": {
    "id": 101,
    "name": "Quantum SSD 1TB",
    "description": "A high-speed solid state drive.",
    "price": 99.99,
    "category": "Storage",
    "tags": ["ssd", "fast", "1tb"],
    "last_updated_at": 1678886400000000
  },
  "source": { /* ...源信息, 如 LSN, txId ... */ },
  "op": "u", // c for create, u for update, d for delete
  "ts_ms": 1678886401000
}

3. Solr Sink Connector

现在,我们配置一个消费者将这些消息写入Solr。这里我们使用一个社区的Solr Sink Connector。

首先,Solr中需要有对应的Collection和Schema。对于products数据,一个合理的schema定义如下:

<!-- solr/server/solr/products_core/conf/managed-schema -->
<schema name="products" version="1.6">
  <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
  <field name="name" type="text_general" indexed="true" stored="true"/>
  <field name="description" type="text_general" indexed="true" stored="true"/>
  <field name="price" type="pdouble" indexed="true" stored="true"/>
  <field name="category" type="string" indexed="true" stored="true" docValues="true"/>
  <field name="tags" type="string" indexed="true" stored="true" multiValued="true"/>
  <field name="last_updated_at" type="pdate" indexed="true" stored="true"/>

  <uniqueKey>id</uniqueKey>

  <!-- 其他字段类型定义... -->
  <fieldType name="text_general" class="solr.TextField" positionIncrementGap="100">
    <!-- ...分析器配置... -->
  </fieldType>
</schema>

然后,部署Kafka Connect Solr Sink的配置:

// connect-solr-sink.json
{
    "name": "solr-sink-products",
    "config": {
        "connector.class": "com.github.jcustenborder.kafka.connect.solr.HttpSolrSinkConnector",
        "tasks.max": "2",
        "topics": "inventory_postgres.public.products",
        "solr.url": "http://solr-host1:8983/solr,http://solr-host2:8983/solr",
        "solr.collection": "products_core",
        "solr.commit.within": "5000", // 5秒内提交

        // 关键:处理Debezium格式
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false", // d操作会产生tombstone
        "transforms.unwrap.delete.handling.mode": "drop", // d操作的消息,unwrap后value为null

        // 处理删除操作
        // 当 unwrapped 消息 value 为 null 时,连接器会发送一个 delete by ID 请求
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",

        // 错误处理
        "errors.tolerance": "all",
        "errors.deadletterqueue.topic.name": "dlq_solr_sink_products"
    }
}

一个常见的坑在于处理删除操作。Debezium对DELETE操作会发送一条valuenull的消息(tombstone record)。我们需要配置ExtractNewRecordState SMT来正确地处理它,并确保Solr Sink连接器能将其转换为Solr的delete命令。

4. ClickHouse Sink Connector

向ClickHouse同步数据是最具挑战性的一环,因为ClickHouse的表引擎设计和标准OLTP数据库差异巨大。为了能够处理CDC流中的UPDATEDELETE,我们必须使用ClickHouse的特定引擎,ReplacingMergeTree是理想的选择。

首先,在ClickHouse中创建目标表:

CREATE TABLE inventory.products (
    `id` Int32,
    `name` String,
    `description` String,
    `price` Decimal(10, 2),
    `category` LowCardinality(String),
    `tags` Array(String),
    `last_updated_at` DateTime64(3, 'UTC'),
    `sign` Int8, -- 用于处理删除的标记列
    `version` UInt64 -- 版本列,用于ReplacingMergeTree判断最新记录
) ENGINE = ReplacingMergeTree(version)
ORDER BY id
PARTITION BY toYYYYMM(last_updated_at);

这里的关键点:

  • ReplacingMergeTree(version): 这个引擎会在后台合并数据时,对于ORDER BY key相同的行,只保留version列值最大的那一行。
  • version: 我们需要一个单调递增的版本号。Debezium事件中的source.lsn (Log Sequence Number) 或 source.ts_ms 是绝佳的版本来源。
  • sign: ReplacingMergeTree本身不直接支持DELETE。一种常见的模式是”标记删除”,即UPDATE一条记录,将其sign字段设为-1。查询时通过WHERE sign = 1来过滤掉已删除的数据。我们需要一个CollapsingMergeTree来最终物理删除它们,但为了简化,这里我们只做标记。

接下来是Kafka Connect JDBC Sink的配置,它需要一些转换逻辑才能将Debezium消息适配到ClickHouse表:

// connect-clickhouse-jdbc-sink.json
{
    "name": "clickhouse-sink-products",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "2",
        "topics": "inventory_postgres.public.products",
        "connection.url": "jdbc:clickhouse://clickhouse-host:8123/inventory",
        "table.name.format": "products",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "id", // 主键
        "delete.enabled": "true", // 允许删除

        // 核心转换逻辑
        "transforms": "unwrap,extractKey,castKey,addVersion,addSign",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        
        // 从Debezium的key中提取id
        "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKey.field": "id",
        "transforms.castKey.type": "org.apache.kafka.connect.transforms.Cast$Key",
        "transforms.castKey.spec": "int32",

        // 从source块中提取ts_ms作为version列
        "transforms.addVersion.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addVersion.offset.field": "source.ts_ms",
        "transforms.addVersion.static.field": "version", // 目标列名
        "transforms.addVersion.static.value": "0", // 占位符,会被offset覆盖

        // 根据op字段添加sign列
        "transforms.addSign.type": "org.apache.kafka.connect.transforms.HeaderToField",
        "transforms.addSign.headers": "['__debezium-op']", // Debezium默认会把op放到header
        "transforms.addSign.fields.map": "{'__debezium-op': 'op_char'}",
        // 这里需要自定义一个SMT来将 c/u -> 1, d -> -1,或者在下游ClickHouse用Materialized View处理
        // 为简化,假设我们有一个自定义SMT: com.my.transforms.OpToSign
        // "transforms.mapSign.type": "com.my.transforms.OpToSign",
        // "transforms.mapSign.op.field": "op_char",
        // "transforms.mapSign.sign.field": "sign",

        // 自动创建和演进表结构,生产环境慎用
        "auto.create": "false",
        "auto.evolve": "false",

        "errors.tolerance": "all",
        "errors.deadletterqueue.topic.name": "dlq_clickhouse_sink_products"
    }
}

这个ClickHouse Sink的配置相当复杂。在真实项目中,使用多个内置SMT链可能仍然不够灵活,特别是处理sign列。通常需要开发一个简单的自定义SMT,或者在ClickHouse端创建一个物化视图,监听一个中间表,然后进行转换后写入最终的ReplacingMergeTree表。

架构的扩展性与局限性

当前这套基于CDC的架构具有良好的扩展性。当业务增长,可以通过增加Kafka分区和Connect task数量来水平扩展数据管道的吞吐量。引入新的下游数据消费者对现有系统零影响。

然而,这套架构也存在固有的局限性,必须清醒地认识到其适用边界:

  1. 最终一致性延迟: 这是最核心的权衡。整个管道的端到端延迟(p99延迟可能在几秒到几十秒)必须是业务可以接受的。它不适用于需要强一致性的场景,比如检查用户账户余额后立即进行扣款。
  2. Schema演进的复杂性: 当源数据库的表结构发生变更(如ADD COLUMN),需要一套严谨的流程来保证管道不中断。通常的顺序是:先更新并部署下游消费者的配置(如Solr Schema API),确保它们能处理新字段,然后再在源数据库执行ALTER TABLE。顺序错误会导致消息反序列化失败,堵塞整个管道。
  3. 操作复杂性: 引入了Kafka、Zookeeper、Kafka Connect等一套分布式系统,对运维团队提出了更高的要求。你需要完善的监控来追踪复制延迟、连接器任务状态、消费者组lag等关键指标。
  4. 数据回溯与修复: 当出现逻辑错误导致下游数据污染时,修复数据是一项挑战。可能需要停止消费者,手动修正Kafka topic中的数据(或让消费者跳过错误的offset),或者对下游系统进行全量重建。Debezium的快照功能可以帮助进行全量同步,但对于TB级别的大表,这个过程耗时且消耗资源。

  目录