1. 技术问题的定义:读写模型分离的必然性与数据一致性挑战
在一个典型的复杂业务系统中,事务性操作(写入)和分析性查询(读取)对数据模型的要求截然不同。写入模型需要高度规范化以保证数据完整性和事务原子性,而读取模型,尤其是支撑复杂搜索、聚合和报表的场景,则期望数据是反规范化的、宽表化的,以便用最小的IO和计算代价获取结果。
将这两种需求强行捆绑在同一个数据库实例上,是大多数系统发展到一定阶段后必然会遇到的性能瓶颈。高并发的写入会与耗时长的复杂查询争抢数据库连接、CPU和IO资源,最终导致两者性能双双下降。
最初的解决方案,往往是简单的读写分离,但这并未解决根本问题,因为读取模型依然受限于写入模型的结构。一个更彻底的方案是命令查询职责分离(CQRS)。然而,CQRS架构的核心挑战在于如何可靠、高效地将写入侧(Command)的状态变更,投影(Project)到读取侧(Query)的数据存储中,并确保最终的数据一致性。
2. 方案权衡:从双写到事件驱动
方案 A: 应用程序双写
最直观的实现方式是在应用层进行双写。当一个写操作完成对主数据库的更新后,代码同步或异步地调用另一个服务,将数据写入到Solr中。
// 典型的双写模式伪代码
@Transactional
public void updateUserProfile(UserProfile profile) {
// 1. 更新主数据库 (PostgreSQL, MySQL, etc.)
userRepository.save(profile);
// 2. 更新搜索引擎 (Solr)
try {
solrClient.addBean("profiles", profile.toSolrDocument());
solrClient.commit("profiles");
} catch (SolrServerException | IOException e) {
// 关键问题:如何处理这里的失败?
// 如果回滚数据库事务,用户操作失败,但业务逻辑本身是成功的。
// 如果不回滚,数据就不一致了。
logger.error("Failed to update Solr index for user {}", profile.getId(), e);
// 可以发送一个补偿消息,但这增加了系统的复杂性。
}
}
劣势分析:
- 一致性问题: 这是一个典型的分布式事务问题。如果Solr写入失败,主数据库的事务是否应该回滚?这在业务上通常是不可接受的。采用补偿机制或重试逻辑会极大地增加代码复杂度和维护成本。
- 耦合性高: 写入服务必须清楚地知道所有下游的读取模型和它们的存储细节。每当增加一个新的读取视图(例如,除了Solr,还需要一个Redis缓存),就需要修改核心的写入逻辑。这违反了单一职责原则。
- 性能瓶颈: 同步写入会增加主流程的延迟。即使是异步写入,写入逻辑的复杂性也会随着下游消费者的增多而线性增长。
方案 B: 基于事件日志的异步投影架构
一个更健壮的架构是引入消息队列作为中间的事件日志。写入服务在完成本地事务后,向消息队列发布一个代表状态变更的事件。下游的多个投影服务(Projectors)可以独立地订阅这些事件,并更新各自的读取存储。
这个模型的核心是选择一个可靠的、支持持久化和重放的消息系统。Apache Pulsar因其计算与存储分离的架构、分层存储以及对消息持久性的强保证,成为理想选择。
graph TD
subgraph "写入侧 (Command Side)"
A[API Gateway] --> B{业务服务};
B -- 1. 持久化到主数据库 --> C[Transactional DB];
B -- 2. 发布领域事件 --> D[Apache Pulsar Topic];
end
subgraph "投影与查询侧 (Projection & Query Side)"
E[Spark Streaming Job] -- 3. 订阅事件 --> D;
E -- 4. 数据转换与聚合 --> F[Apache Solr Index];
G[查询服务] -- 5. 查询 --> F;
H[API Gateway] --> G;
end
优势分析:
- 解耦: 写入服务只负责发布事件,不关心谁消费、如何消费。添加新的读取模型只需要部署一个新的消费者,对现有系统零侵入。
- 韧性与可恢复性: Pulsar的持久化日志特性允许消费者在失败后从上一个检查点恢复,保证了消息的至少一次(At-Least-Once)或精确一次(Exactly-Once)处理。如果Solr索引损坏,可以通过重放(Replay)Pulsar Topic中的所有历史消息来重建整个索引。
- 可伸缩性: 写入和读取的处理能力可以独立扩展。如果投影逻辑复杂、计算密集,我们可以使用像Apache Spark这样的分布式计算引擎来消费Pulsar的数据流,并利用其强大的并行处理能力。
3. 最终选择与核心实现
我们最终选择方案B。该架构使用Pulsar作为事件总线,利用Spark Structured Streaming作为投影引擎,将数据实时写入Solr。这种组合兼顾了可靠性、可伸缩性和处理复杂业务逻辑的能力。
一个真实项目中的关键挑战是,如何确保这个异步、分布式的管道行为是正确且可验证的?这就是引入行为驱动开发(BDD)的原因。我们不只是测试单个组件,而是通过BDD测试来端到端地验证从“事件发布”到“数据可查询”的整个流程。
3.1 Pulsar 事件生产者
生产者必须保证消息的顺序和持久化。对于同一个业务实体(例如,同一张订单),其所有相关事件必须按顺序处理。这可以通过Pulsar的Keyed Producer实现。
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
// 定义事件模型
public class OrderEvent implements Serializable {
private String orderId;
private String eventType; // CREATED, UPDATED, CANCELLED
private long eventTimestamp;
private java.util.Map<String, Object> payload;
// getters and setters
}
public class EventPublisher {
private final Producer<OrderEvent> producer;
private final PulsarClient client;
public EventPublisher(String serviceUrl, String topicName) throws PulsarClientException {
this.client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
this.producer = client.newProducer(JSONSchema.of(OrderEvent.class))
.topic(topicName)
// 确保消息被持久化到磁盘
.blockIfQueueFull(true)
// 启用批处理以提高吞吐量
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.batchingMaxMessages(1000)
// 保证相同orderId的消息进入同一个Pulsar分区,从而保证顺序性
.messageRouter(new KeyBasedBatcherBuilder())
.create();
}
public void publish(OrderEvent event) {
if (event.getOrderId() == null || event.getOrderId().isEmpty()) {
throw new IllegalArgumentException("OrderId cannot be null or empty for keyed publishing.");
}
// 使用 orderId 作为 key
producer.newMessage()
.key(event.getOrderId())
.value(event)
.sendAsync()
.exceptionally(ex -> {
// 生产环境级的错误处理:记录日志、推送到死信队列、告警
System.err.printf("Failed to send message for order %s: %s%n", event.getOrderId(), ex);
return null;
});
}
public void close() throws PulsarClientException {
producer.close();
client.close();
}
}
这里的坑在于,sendAsync是异步的。在服务关闭时,必须确保所有缓冲区的消息都已发送完成,通常需要调用producer.flush()和producer.close()并等待完成。
3.2 Spark Streaming 投影作业
Spark Structured Streaming 作业是这个架构的核心计算单元。它消费Pulsar流,执行数据转换,并将结果幂等地写入Solr。
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
object PulsarToSolrProjector {
// 定义Pulsar事件的Schema
val eventSchema = new StructType()
.add("orderId", StringType)
.add("eventType", StringType)
.add("eventTimestamp", LongType)
.add("payload", MapType(StringType, StringType))
def main(args: Array[String]): Unit = {
// 生产环境配置应来自外部文件
val spark = SparkSession.builder
.appName("PulsarToSolrOrderProjector")
.master("local[*]") // 在生产环境中应为 YARN or Kubernetes master
.config("spark.sql.streaming.checkpointLocation", "/tmp/spark/checkpoints/solr_projector")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
// 1. 从Pulsar读取原始事件流
val pulsarStreamDF = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topic", "persistent://public/default/orders")
.option("subscriptionName", "spark-solr-projector-sub")
// 从最早的消息开始消费,用于重建或冷启动
.option("startingOffsets", "earliest")
.load()
// 2. 解析JSON数据并应用Schema
val eventsDF = pulsarStreamDF
.select(from_json(col("value").cast("string"), eventSchema).as("data"))
.select("data.*")
// 3. 核心业务逻辑:转换与聚合
// 在真实项目中,这里可能涉及从其他数据源(如HDFS, S3, JDBC)进行数据丰富
// 例如,根据payload中的productId去关联商品维度表
val solrProjectionDF = eventsDF
.withColumn("last_update_ts", from_unixtime(col("eventTimestamp") / 1000))
// 假设payload中有customer_id和total_amount
.select(
col("orderId").as("id"), // 映射到Solr的唯一ID
col("payload.customer_id").as("customer_id_s"),
col("payload.total_amount").cast(DoubleType).as("total_amount_d"),
col("eventType").as("status_s"),
col("last_update_ts").as("last_update_dt")
)
// 过滤掉不完整的记录
.filter(col("id").isNotNull)
// 4. 写入Solr
// 我们需要一个自定义的 Solr Sink 来实现幂等写入和批量更新
val query = solrProjectionDF.writeStream
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("30 seconds"))
// 使用 foreachBatch 提供了更灵活的控制
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
println(s"Processing batch $batchId...")
batchDF.persist() // 缓存以避免重计算
// 这是一个常见的错误:直接在 RDD/DataFrame 的 transformation 内部创建
// 昂贵的连接对象(如数据库连接)。正确做法是在每个分区上创建一次。
batchDF.foreachPartition { partitionOfRecords =>
if (partitionOfRecords.nonEmpty) {
// 在这里创建 SolrJ 客户端,它在 executor 端执行
val solrClient = SolrClientFactory.create("http://localhost:8983/solr/orders")
val docs = new java.util.ArrayList[org.apache.solr.common.SolrInputDocument]()
partitionOfRecords.foreach { row =>
val doc = new org.apache.solr.common.SolrInputDocument()
row.schema.fieldNames.foreach { fieldName =>
// 简单的类型转换,生产环境需要更健壮的逻辑
doc.addField(fieldName, row.getAs(fieldName))
}
docs.add(doc)
}
// 批量提交
try {
solrClient.add(docs)
solrClient.commit()
} catch {
case e: Exception =>
// 生产级的错误处理:重试、记录失败批次、告警
println(s"Failed to write partition to Solr: ${e.getMessage}")
throw e // 抛出异常让 Spark driver 知道任务失败并重试
} finally {
solrClient.close()
}
}
}
batchDF.unpersist()
}
.start()
query.awaitTermination()
}
}
实现要点与陷阱:
- Checkpointing:
spark.sql.streaming.checkpointLocation是保证端到端精确一次或至少一次语义的关键。Spark会在这里记录已经处理的offset。 - 幂等写入:
foreachBatch模式是实现自定义写入逻辑(包括幂等性)的最佳方式。Solr的add操作本身就是幂等的(如果文档有唯一ID),这简化了设计。如果写入目标不是幂等的,你需要在foreachBatch中实现自己的事务或状态管理逻辑。 - 连接管理: 必须在
foreachPartition内部创建Solr客户端实例,而不是在Driver端创建然后序列化到Executor。后者不仅会导致序列化问题,还会造成连接资源的浪费和竞争。
3.3 行为驱动的端到端验证
测试这个分布式系统是复杂的。单元测试可以验证Spark的转换逻辑,但无法保证整个数据流的正确性。BDD和Gherkin语法提供了一种描述系统行为的通用语言。
Gherkin Feature 文件 (order_projection.feature):
Feature: Order Event Projection to Solr
As a system architect, I want to ensure that order events published to Pulsar
are correctly transformed and indexed into Solr, making them searchable.
Scenario: A new order creation event is correctly projected
Given a clean Solr "orders" collection
And a unique order ID "ord-123"
When an "ORDER_CREATED" event for order "ord-123" with amount 99.99 is published to Pulsar
Then within 60 seconds, a Solr document with id "ord-123" must exist
And the Solr document "ord-123" should have field "status_s" with value "ORDER_CREATED"
And the Solr document "ord-123" should have field "total_amount_d" with value 99.99
Step Definition 实现 (使用 ScalaTest 和 Cucumber):
import io.cucumber.scala.{EN, ScalaDsl}
import org.apache.solr.client.solrj.impl.HttpSolrClient
import org.apache.solr.common.SolrInputDocument
import org.scalatest.matchers.should.Matchers
import java.util.concurrent.TimeUnit
import scala.util.Try
class OrderProjectionSteps extends ScalaDsl with EN with Matchers {
// 这些客户端和服务应在测试生命周期中正确初始化和销毁
var pulsarTestProducer: TestPulsarProducer = _
var solrTestClient: HttpSolrClient = _
var uniqueOrderId: String = _
// --- Test Setup/Teardown (e.g., @Before, @After) ---
// Before each scenario:
// 1. Initialize pulsarTestProducer for a test topic
// 2. Initialize solrTestClient
// 3. Clean the Solr collection
Given("""a clean Solr "orders" collection""") {
solrTestClient.deleteByQuery("*:*")
solrTestClient.commit()
}
Given("""a unique order ID {string}""") { (orderId: String) =>
uniqueOrderId = s"${orderId}-${System.currentTimeMillis()}"
}
When("""an {string} event for order {string} with amount {double} is published to Pulsar""") {
(eventType: String, orderIdPlaceholder: String, amount: Double) =>
val event = createOrderEvent(uniqueOrderId, eventType, Map("total_amount" -> amount.toString))
pulsarTestProducer.publish(event)
}
Then("""within {int} seconds, a Solr document with id {string} must exist""") {
(timeoutSec: Int, idPlaceholder: String) =>
val startTime = System.currentTimeMillis()
var docFound = false
while (!docFound && (System.currentTimeMillis() - startTime) < timeoutSec * 1000) {
val doc = Try(solrTestClient.getById(uniqueOrderId)).toOption.flatten
if (doc.isDefined) {
docFound = true
} else {
Thread.sleep(1000)
}
}
docFound should be(true)
}
And("""the Solr document {string} should have field {string} with value {string}""") {
(idPlaceholder: String, field: String, expectedValue: String) =>
val doc = solrTestClient.getById(uniqueOrderId)
doc.getFieldValue(field).toString should equal(expectedValue)
}
And("""the Solr document {string} should have field {string} with value {double}""") {
(idPlaceholder: String, field: String, expectedValue: Double) =>
val doc = solrTestClient.getById(uniqueOrderId)
doc.getFieldValue(field).asInstanceOf[Double] should be(expectedValue +- 0.001)
}
}
这个BDD测试套件在CI/CD流水线中运行,它启动一个包含了Pulsar、Spark和Solr的Docker Compose环境,然后执行测试。它不模拟任何组件,而是真实地测试了数据流经每个组件后的最终状态,为这个复杂的异步架构提供了极高的信心。
4. 架构的扩展性与局限性
该架构的扩展性体现在其解耦的本质上。若需为BI系统提供数据,可以部署一个新的Spark作业消费同一个Pulsar Topic,将数据聚合后写入ClickHouse或Doris。若需实时更新用户画像缓存,可以部署一个轻量级的Flink作业或Pulsar Function,将数据写入Redis。所有这些扩展都无需改动核心的写入服务。
然而,这个架构并非银弹。其固有的局限性在于最终一致性。从事件产生到数据在Solr中可被查询,存在一个可测量的延迟(通常是秒级)。对于那些要求强一致性读的场景(read-your-writes),此架构不适用,或需要客户端进行额外的补偿处理(例如,在写入后短时间内,查询优先从主数据库读取,之后再转向Solr)。
此外,系统的运维复杂性也显著增加。维护Pulsar集群、Spark集群和SolrCloud集群需要专业的SRE团队。全链路的监控、告警和日志分析变得至关重要,以快速定位是事件生产、流处理还是索引写入环节出现了问题。对事件Schema的演进管理也需要一套严格的流程,以保证向后兼容性,避免破坏下游消费者。