在构建复杂的业务系统时,一个常见的技术挑战是数据需要以多种形式被消费。例如,核心业务数据存储在PostgreSQL或MySQL这类关系型数据库中,以保证事务的ACID特性;同时,运营和分析团队需要对这些数据进行高性能的即席SQL查询,这正是ClickHouse等OLAP引擎的用武之地;而面向用户的产品功能则可能需要强大的全文检索、分面、排序能力,这又是Solr或Elasticsearch这类搜索引擎的专长。
问题随之而来:如何维护这三者之间的数据一致性?
方案A:应用层双写——脆弱的捷径
最初步的构想往往是在业务代码中执行同步双写(或者说“三写”)。当一个写操作,比如创建一个新商品,发生时,业务逻辑会依次执行:
- 在PostgreSQL中开启事务。
-
INSERT
数据到products
表。 - 向ClickHouse的
products
表发起INSERT
请求。 - 向Solr的
products
核心发送add
命令。 - 提交PostgreSQL事务。
如果其中任何一步失败,则尝试回滚所有操作。这种方案在原型阶段看起来似乎可行,但在真实项目中,它是一条通往灾难的捷径。
这里的核心问题在于紧耦合与责任错配。
- 性能瓶颈: 整个业务操作的响应时间取决于最慢的那个组件。Solr的索引合并、ClickHouse的后台Merge,或是网络抖动都可能拖慢核心的数据库事务,直接影响用户体验。
- 可用性黑洞: 任何一个下游系统的临时不可用(例如Solr正在进行leader选举,或ClickHouse节点重启),都会导致核心业务的写入失败。整个系统的可用性变成了
min(PostgreSQL, ClickHouse, Solr)
,这在分布式环境中是不可接受的。 - 数据一致性幻觉: 除非引入重量级的两阶段提交(2PC/XA)协议,否则无法保证原子性。在分布式环境下,这会带来巨大的性能开销和复杂性。如果没有2PC,一旦在提交PostgreSQL事务后,写入Solr失败,数据就会永久处于不一致状态。补偿逻辑(retries, reconciliation jobs)会迅速让代码变得复杂不堪。
- 维护噩梦: 业务代码与基础设施细节(ClickHouse的连接池、Solr的Zookeeper地址等)高度耦合。每次下游系统变更或升级,都可能需要修改并重新部署核心业务服务。
在生产环境中,应用层双写是一种典型的反模式。它将基础设施的复杂性泄露到了应用层,违反了单一职责原则。
方案B:基于CDC的异步管道——解耦与韧性
一个更健壮的架构是承认主数据源的权威地位,并将数据同步的责任下沉到基础设施层。这就是基于变更数据捕获(Change Data Capture, CDC)的方案。
其核心思想是:应用服务只与主数据库(例如PostgreSQL)交互。所有的数据变更都会被记录在数据库的事务日志中(如PostgreSQL的WAL)。一个专门的CDC工具(如Debezium)会伪装成一个从库,读取这些日志,将行级别的 INSERT
, UPDATE
, DELETE
操作解析成结构化的事件,然后将这些事件发布到高吞吐、持久化的消息队列(如Apache Kafka)中。
一旦数据进入Kafka,就成为了可供任意下游消费的“事实之源”。我们可以为ClickHouse和Solr分别部署独立的消费者,它们订阅同一个Kafka topic,并各自负责将数据变更应用到自己的存储中。
graph TD subgraph "业务应用层" AppService[Application Service] end subgraph "主数据源" Postgres[(PostgreSQL)] end subgraph "CDC与消息总线" Debezium[Debezium Connector] Kafka[Apache Kafka Topic: pg.public.products] end subgraph "下游消费与存储" SolrSink[Kafka Connect Solr Sink] ClickHouseSink[Kafka Connect JDBC Sink] Solr[(Solr Collection)] ClickHouse[(ClickHouse Table)] end AppService -- "INSERT/UPDATE/DELETE" --> Postgres Postgres -- "Reads WAL Log" --> Debezium Debezium -- "Publishes Change Events" --> Kafka Kafka --> SolrSink Kafka --> ClickHouseSink SolrSink -- "Writes Documents" --> Solr ClickHouseSink -- "Writes Rows" --> ClickHouse
这个架构的优势是显而易见的:
- 解耦: 应用服务彻底摆脱了对ClickHouse和Solr的感知。它的唯一职责是保证主数据库的事务正确性。
- 韧性: Kafka作为中间缓冲层至关重要。如果Solr集群暂时不可用,数据会积压在Kafka topic中,待Solr恢复后,消费者会自动追赶进度,不会丢失任何变更。主业务的写入完全不受影响。
- 可扩展性: 未来如果需要将数据同步到第四个系统(比如一个Redis缓存),我们只需要新增一个消费者组即可,对现有系统零侵入。
- 性能: 核心事务的延迟只与主数据库的性能有关,同步到下游系统的过程是异步的,对用户无感知。
当然,这种方案的主要权衡是最终一致性。从数据在PostgreSQL中提交,到它在ClickHouse和Solr中可被查询,会存在一个可度量的延迟(通常在秒级)。对于大多数分析和搜索场景,这种延迟是完全可以接受的。
最终选择与理由
对于任何需要长期演进和维护的系统,方案B(CDC异步管道)是压倒性的胜利。它用可控的最终一致性换来了系统间的高度解耦、卓越的韧性和水平扩展能力。初期增加的部署复杂性(需要维护Kafka和Kafka Connect集群)在系统规模扩大后,会通过降低维护成本和提高系统稳定性得到百倍的回报。
在真实项目中,选择CDC方案意味着将数据同步从一个“应用逻辑问题”转变为一个“数据工程问题”,这是一个正确的职责划分。
核心实现概览
我们将以一个简化的products
表为例,展示如何搭建这套管道。
1. 源数据库 (PostgreSQL)
假设我们有如下的products
表:
-- 确保开启逻辑复制
-- 在 postgresql.conf 中设置:
-- wal_level = logical
-- max_wal_senders = 4
-- max_replication_slots = 4
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price NUMERIC(10, 2) NOT NULL,
category VARCHAR(100),
tags VARCHAR(255)[],
last_updated_at TIMESTAMPTZ DEFAULT now()
);
-- 创建角色用于复制
CREATE ROLE debezium_user REPLICATION LOGIN PASSWORD 'password';
GRANT SELECT ON products TO debezium_user;
2. Debezium Connector for PostgreSQL
在Kafka Connect集群中,我们部署Debezium连接器来捕获products
表的变更。
// debezium-postgres-source.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "password",
"database.dbname": "inventory_db",
"database.server.name": "inventory_postgres",
"table.include.list": "public.products",
"publication.autocreate.mode": "filtered", // 自动创建publication
"plugin.name": "pgoutput",
// Kafka 相关配置
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
// 错误处理:发送到死信队列
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq_inventory_topic",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true"
}
}
部署后,products
表的任何变更都会产生一条类似下面结构的JSON消息,并发送到名为 inventory_postgres.public.products
的Kafka topic中。
// Debezium 消息结构示例 (update操作)
{
"before": { /* ...旧的行数据... */ },
"after": {
"id": 101,
"name": "Quantum SSD 1TB",
"description": "A high-speed solid state drive.",
"price": 99.99,
"category": "Storage",
"tags": ["ssd", "fast", "1tb"],
"last_updated_at": 1678886400000000
},
"source": { /* ...源信息, 如 LSN, txId ... */ },
"op": "u", // c for create, u for update, d for delete
"ts_ms": 1678886401000
}
3. Solr Sink Connector
现在,我们配置一个消费者将这些消息写入Solr。这里我们使用一个社区的Solr Sink Connector。
首先,Solr中需要有对应的Collection和Schema。对于products
数据,一个合理的schema定义如下:
<!-- solr/server/solr/products_core/conf/managed-schema -->
<schema name="products" version="1.6">
<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="name" type="text_general" indexed="true" stored="true"/>
<field name="description" type="text_general" indexed="true" stored="true"/>
<field name="price" type="pdouble" indexed="true" stored="true"/>
<field name="category" type="string" indexed="true" stored="true" docValues="true"/>
<field name="tags" type="string" indexed="true" stored="true" multiValued="true"/>
<field name="last_updated_at" type="pdate" indexed="true" stored="true"/>
<uniqueKey>id</uniqueKey>
<!-- 其他字段类型定义... -->
<fieldType name="text_general" class="solr.TextField" positionIncrementGap="100">
<!-- ...分析器配置... -->
</fieldType>
</schema>
然后,部署Kafka Connect Solr Sink的配置:
// connect-solr-sink.json
{
"name": "solr-sink-products",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.solr.HttpSolrSinkConnector",
"tasks.max": "2",
"topics": "inventory_postgres.public.products",
"solr.url": "http://solr-host1:8983/solr,http://solr-host2:8983/solr",
"solr.collection": "products_core",
"solr.commit.within": "5000", // 5秒内提交
// 关键:处理Debezium格式
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false", // d操作会产生tombstone
"transforms.unwrap.delete.handling.mode": "drop", // d操作的消息,unwrap后value为null
// 处理删除操作
// 当 unwrapped 消息 value 为 null 时,连接器会发送一个 delete by ID 请求
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
// 错误处理
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq_solr_sink_products"
}
}
一个常见的坑在于处理删除操作。Debezium对DELETE
操作会发送一条value
为null
的消息(tombstone record)。我们需要配置ExtractNewRecordState
SMT来正确地处理它,并确保Solr Sink连接器能将其转换为Solr的delete
命令。
4. ClickHouse Sink Connector
向ClickHouse同步数据是最具挑战性的一环,因为ClickHouse的表引擎设计和标准OLTP数据库差异巨大。为了能够处理CDC流中的UPDATE
和DELETE
,我们必须使用ClickHouse的特定引擎,ReplacingMergeTree
是理想的选择。
首先,在ClickHouse中创建目标表:
CREATE TABLE inventory.products (
`id` Int32,
`name` String,
`description` String,
`price` Decimal(10, 2),
`category` LowCardinality(String),
`tags` Array(String),
`last_updated_at` DateTime64(3, 'UTC'),
`sign` Int8, -- 用于处理删除的标记列
`version` UInt64 -- 版本列,用于ReplacingMergeTree判断最新记录
) ENGINE = ReplacingMergeTree(version)
ORDER BY id
PARTITION BY toYYYYMM(last_updated_at);
这里的关键点:
-
ReplacingMergeTree(version)
: 这个引擎会在后台合并数据时,对于ORDER BY
key相同的行,只保留version
列值最大的那一行。 -
version
列: 我们需要一个单调递增的版本号。Debezium事件中的source.lsn
(Log Sequence Number) 或source.ts_ms
是绝佳的版本来源。 -
sign
列:ReplacingMergeTree
本身不直接支持DELETE
。一种常见的模式是”标记删除”,即UPDATE
一条记录,将其sign
字段设为-1。查询时通过WHERE sign = 1
来过滤掉已删除的数据。我们需要一个CollapsingMergeTree
来最终物理删除它们,但为了简化,这里我们只做标记。
接下来是Kafka Connect JDBC Sink的配置,它需要一些转换逻辑才能将Debezium消息适配到ClickHouse表:
// connect-clickhouse-jdbc-sink.json
{
"name": "clickhouse-sink-products",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"topics": "inventory_postgres.public.products",
"connection.url": "jdbc:clickhouse://clickhouse-host:8123/inventory",
"table.name.format": "products",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id", // 主键
"delete.enabled": "true", // 允许删除
// 核心转换逻辑
"transforms": "unwrap,extractKey,castKey,addVersion,addSign",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
// 从Debezium的key中提取id
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id",
"transforms.castKey.type": "org.apache.kafka.connect.transforms.Cast$Key",
"transforms.castKey.spec": "int32",
// 从source块中提取ts_ms作为version列
"transforms.addVersion.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addVersion.offset.field": "source.ts_ms",
"transforms.addVersion.static.field": "version", // 目标列名
"transforms.addVersion.static.value": "0", // 占位符,会被offset覆盖
// 根据op字段添加sign列
"transforms.addSign.type": "org.apache.kafka.connect.transforms.HeaderToField",
"transforms.addSign.headers": "['__debezium-op']", // Debezium默认会把op放到header
"transforms.addSign.fields.map": "{'__debezium-op': 'op_char'}",
// 这里需要自定义一个SMT来将 c/u -> 1, d -> -1,或者在下游ClickHouse用Materialized View处理
// 为简化,假设我们有一个自定义SMT: com.my.transforms.OpToSign
// "transforms.mapSign.type": "com.my.transforms.OpToSign",
// "transforms.mapSign.op.field": "op_char",
// "transforms.mapSign.sign.field": "sign",
// 自动创建和演进表结构,生产环境慎用
"auto.create": "false",
"auto.evolve": "false",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq_clickhouse_sink_products"
}
}
这个ClickHouse Sink的配置相当复杂。在真实项目中,使用多个内置SMT链可能仍然不够灵活,特别是处理sign
列。通常需要开发一个简单的自定义SMT,或者在ClickHouse端创建一个物化视图,监听一个中间表,然后进行转换后写入最终的ReplacingMergeTree
表。
架构的扩展性与局限性
当前这套基于CDC的架构具有良好的扩展性。当业务增长,可以通过增加Kafka分区和Connect task数量来水平扩展数据管道的吞吐量。引入新的下游数据消费者对现有系统零影响。
然而,这套架构也存在固有的局限性,必须清醒地认识到其适用边界:
- 最终一致性延迟: 这是最核心的权衡。整个管道的端到端延迟(p99延迟可能在几秒到几十秒)必须是业务可以接受的。它不适用于需要强一致性的场景,比如检查用户账户余额后立即进行扣款。
- Schema演进的复杂性: 当源数据库的表结构发生变更(如
ADD COLUMN
),需要一套严谨的流程来保证管道不中断。通常的顺序是:先更新并部署下游消费者的配置(如Solr Schema API),确保它们能处理新字段,然后再在源数据库执行ALTER TABLE
。顺序错误会导致消息反序列化失败,堵塞整个管道。 - 操作复杂性: 引入了Kafka、Zookeeper、Kafka Connect等一套分布式系统,对运维团队提出了更高的要求。你需要完善的监控来追踪复制延迟、连接器任务状态、消费者组lag等关键指标。
- 数据回溯与修复: 当出现逻辑错误导致下游数据污染时,修复数据是一项挑战。可能需要停止消费者,手动修正Kafka topic中的数据(或让消费者跳过错误的offset),或者对下游系统进行全量重建。Debezium的快照功能可以帮助进行全量同步,但对于TB级别的大表,这个过程耗时且消耗资源。