一个看似简单的需求摆在面前:我们需要为云服务构建一个计量(Metering)系统。它必须能处理每秒数十万次的资源使用事件,为运营团队提供近乎实时的监控视图,同时,这些数据必须100%可靠地归档,用于月底的精确计费和年度的业务分析。实时性要求指向了Prometheus这样的时序数据库,而长期、海量、低成本的存储与分析则天然地选择了Hadoop生态。问题在于,如何保证一个事件既被Prometheus正确接收,又被Hadoop生态系统完整归档?
在真实项目中,任何跨多个独立系统的写入操作都隐含着数据不一致的风险。如果服务先写入Prometheus成功,但在写入Hadoop的数据管道(例如Kafka)时宕机,我们就有了监控数据,却丢失了计费数据,这是灾难性的。反之亦然。这本质上是一个分布式事务问题。
方案A:先写TSDB,后异步归档
最初的构想非常直接:计量服务接收到数据,直接通过Remote Write协议写入Prometheus集群。同时,部署一个独立的归档服务,定期(例如每小时)从Prometheus查询原始数据,批量转换为Parquet格式,再写入HDFS。
sequenceDiagram participant Client participant MeteringService participant Prometheus participant Archiver participant HDFS Client->>+MeteringService: Report Usage Event MeteringService->>+Prometheus: Remote Write Metrics Prometheus-->>-MeteringService: Ack MeteringService-->>-Client: OK loop Every 1 Hour Archiver->>+Prometheus: Query Raw Data Prometheus-->>-Archiver: Return Time Series Data Archiver->>Archiver: Transform to Parquet Archiver->>+HDFS: Write Parquet File HDFS-->>-Archiver: Ack end
这种架构的优势是简单,组件职责清晰。但它的致命缺陷在于可靠性。
- 数据丢失窗口:在归档服务两次执行的间隔期内,如果Prometheus节点发生不可逆的磁盘故障,这部分数据将永久丢失。对于计费系统,这是不可接受的。
- Prometheus压力:大规模、高频的范围查询对Prometheus是一个巨大的负担,可能影响其作为核心监控系统的稳定性。
- 数据一致性:无法保证查询操作能精确地、无重复、无遗漏地导出所有数据,尤其是在网络分区或服务重启的边缘情况下。
这个方案在非关键业务监控场景下或许堪用,但对于要求审计级别数据完整性的计费系统,它过于脆弱。
方案B:事件总线驱动的双写
为了解决数据丢失问题,引入消息队列(如Kafka)作为中间缓冲层是业界的标准做法。计量服务将事件写入Kafka,然后由两个独立的消费者组分别消费数据,一个写入Prometheus,另一个写入HDFS。
sequenceDiagram participant Client participant MeteringService participant Kafka participant PrometheusConsumer participant HadoopConsumer participant Prometheus participant HDFS Client->>+MeteringService: Report Usage Event MeteringService->>+Kafka: Produce Event Kafka-->>-MeteringService: Ack MeteringService-->>-Client: OK par PrometheusConsumer->>+Kafka: Consume Event Kafka-->>-PrometheusConsumer: Deliver Event PrometheusConsumer->>+Prometheus: Remote Write Prometheus-->>-PrometheusConsumer: Ack PrometheusConsumer->>+Kafka: Commit Offset and HadoopConsumer->>+Kafka: Consume Event Kafka-->>-HadoopConsumer: Deliver Event HadoopConsumer->>+HDFS: Write to Staging Area HDFS-->>-HadoopConsumer: Ack HadoopConsumer->>+Kafka: Commit Offset end
该方案通过Kafka的持久化能力,极大地提升了数据的可靠性。只要事件成功写入Kafka,后续的处理失败都可以通过重试来解决。然而,它引入了一个新的、更隐蔽的一致性问题:两个消费者是独立的事务单元。
设想一个场景:PrometheusConsumer
成功消费了消息并写入了Prometheus,然后提交了Offset。但HadoopConsumer
在处理同一条消息时,由于HDFS暂时不可用或自身逻辑错误,反复失败并最终被阻塞或跳过。结果是,运营仪表盘上显示了用户的使用量,但计费系统里却没有这条记录。两个系统的数据产生了永久性的分歧。
最终选择:基于Saga模式的编排式数据持久化
我们需要的是一个原子性的操作,确保“写入Prometheus”和“写入Hadoop管道”这两个动作要么都成功,要么都失败。由于这两个系统不支持两阶段提交(2PC),我们必须在应用层实现最终一致性。Saga模式是解决这类问题的经典方案。
我们将整个流程定义为一个Saga,它由一系列的本地事务(步骤)组成。每个步骤都有一个对应的补偿操作。如果任何步骤失败,Saga协调器会依次调用前面已成功步骤的补偿操作,从而在逻辑上“回滚”整个事务。
我们的Saga包含两个核心步骤:
-
PersistToPrometheus
: 将计量数据写入Prometheus。 -
StageForHadoop
: 将相同的数据写入Kafka,作为进入Hadoop生态的持久化入口。
如果PersistToPrometheus
成功,但StageForHadoop
失败,我们需要执行PersistToPrometheus
的补偿操作。这里的坑在于,从Prometheus中精确删除一个数据点是困难且低效的。一个更务实的补偿方案是:写入一个“冲销”指标。例如,如果原始指标是service_usage_bytes{user="A"} 1024
,补偿操作就是写入service_usage_bytes_correction{user="A"} -1024
。这样在查询时,将原始指标和冲销指标相加,就能得到正确的结果。
核心实现概览
我们将使用Java来实现这个Saga协调器。首先定义Saga步骤的接口。
// src/main/java/com/example/metering/saga/SagaStep.java
package com.example.metering.saga;
import java.util.concurrent.CompletableFuture;
/**
* 定义Saga流程中的一个步骤.
* 每个步骤都包含一个正向操作 (action) 和一个补偿操作 (compensation).
* @param <T> Saga上下文对象的类型
*/
public interface SagaStep<T> {
/**
* 执行正向操作.
* @param context 包含执行所需数据的Saga上下文
* @return 一个表示异步操作完成的CompletableFuture
*/
CompletableFuture<Void> action(SagaContext<T> context);
/**
* 执行补偿操作.
* 只有在action成功后,后续步骤失败时才会被调用.
* @param context 包含补偿所需数据的Saga上下文
* @return 一个表示异步补偿操作完成的CompletableFuture
*/
CompletableFuture<Void> compensation(SagaContext<T> context);
/**
* 步骤的名称,用于日志和调试.
* @return 步骤名称
*/
String getName();
}
SagaContext
是用于在Saga各步骤间传递数据的上下文对象。
// src/main/java/com/example/metering/saga/SagaContext.java
package com.example.metering.saga;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SagaContext<T> {
private final String sagaId;
private final T payload;
private final Map<String, Object> stepData; // 用于存储每个步骤的中间结果
public SagaContext(T payload) {
this.sagaId = UUID.randomUUID().toString();
this.payload = payload;
this.stepData = new ConcurrentHashMap<>();
}
public String getSagaId() {
return sagaId;
}
public T getPayload() {
return payload;
}
public void putStepData(String key, Object value) {
this.stepData.put(key, value);
}
@SuppressWarnings("unchecked")
public <V> V getStepData(String key) {
return (V) this.stepData.get(key);
}
}
接下来是Saga协调器的实现,它负责按顺序执行步骤并处理失败。
// src/main/java/com/example/metering/saga/SagaOrchestrator.java
package com.example.metering.saga;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
public class SagaOrchestrator<T> {
private static final Logger logger = LoggerFactory.getLogger(SagaOrchestrator.class);
private final List<SagaStep<T>> steps;
private final Executor executor;
public SagaOrchestrator(List<SagaStep<T>> steps, Executor executor) {
this.steps = steps;
this.executor = executor;
}
public CompletableFuture<Void> execute(T payload) {
SagaContext<T> context = new SagaContext<>(payload);
Stack<SagaStep<T>> executedSteps = new Stack<>();
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
for (SagaStep<T> step : steps) {
future = future.thenComposeAsync(v -> {
logger.info("Saga [{}]: Executing step [{}]", context.getSagaId(), step.getName());
return step.action(context)
.thenAccept(result -> executedSteps.push(step))
.exceptionally(ex -> {
logger.error("Saga [{}]: Step [{}] failed. Starting compensation.", context.getSagaId(), step.getName(), ex);
throw new SagaExecutionException(ex, executedSteps);
});
}, executor);
}
return future.exceptionally(ex -> {
if (ex.getCause() instanceof SagaExecutionException) {
SagaExecutionException sagaEx = (SagaExecutionException) ex.getCause();
compensate(context, sagaEx.getExecutedSteps());
} else {
logger.error("Saga [{}]: Unexpected error during execution.", context.getSagaId(), ex);
}
// 重新抛出异常,让调用方知道Saga失败了
throw new RuntimeException("Saga execution failed and compensation was triggered.", ex);
});
}
private void compensate(SagaContext<T> context, Stack<SagaStep<T>> executedSteps) {
logger.info("Saga [{}]: Starting compensation for {} steps.", context.getSagaId(), executedSteps.size());
while (!executedSteps.isEmpty()) {
SagaStep<T> stepToCompensate = executedSteps.pop();
try {
logger.warn("Saga [{}]: Compensating for step [{}]", context.getSagaId(), stepToCompensate.getName());
// 在真实项目中,这里的补偿失败需要有重试和告警机制
stepToCompensate.compensation(context).join();
} catch (Exception ex) {
// 补偿失败是一个严重问题,必须记录并告警
logger.error("Saga [{}]: CRITICAL - Compensation for step [{}] failed. Manual intervention required.",
context.getSagaId(), stepToCompensate.getName(), ex);
// 停止补偿链,因为状态已不确定
break;
}
}
}
private static class SagaExecutionException extends RuntimeException {
private final Stack<SagaStep<T>> executedSteps;
public SagaExecutionException(Throwable cause, Stack<SagaStep<T>> executedSteps) {
super(cause);
this.executedSteps = executedSteps;
}
public Stack<SagaStep<T>> getExecutedSteps() {
return executedSteps;
}
}
}
现在,我们来实现具体的步骤。PersistToPrometheusStep
负责与Prometheus交互。这里我们模拟一个HTTP客户端来发送Remote Write请求。在生产环境中,应该使用一个健壮的HTTP客户端库(如OkHttp)并处理连接池、超时和重试。
// src/main/java/com/example/metering/steps/PersistToPrometheusStep.java
package com.example.metering.steps;
// 假设的依赖
import com.example.metering.model.MeteringEvent;
import com.example.metering.saga.SagaContext;
import com.example.metering.saga.SagaStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
public class PersistToPrometheusStep implements SagaStep<MeteringEvent> {
private static final Logger logger = LoggerFactory.getLogger(PersistToPrometheusStep.class);
private final HttpClient httpClient;
private final String remoteWriteUrl;
public PersistToPrometheusStep(String remoteWriteUrl) {
this.remoteWriteUrl = remoteWriteUrl;
this.httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).build();
}
@Override
public String getName() {
return "PersistToPrometheus";
}
@Override
public CompletableFuture<Void> action(SagaContext<MeteringEvent> context) {
MeteringEvent event = context.getPayload();
// 实际实现中,这里需要将MeteringEvent转换为Prometheus Remote Write的Protobuf格式
// 并使用Snappy压缩。为简化,我们只发送一个模拟的字符串。
String requestBody = String.format("service_usage_bytes{user=\"%s\", service=\"%s\"} %d %d",
event.getUserId(), event.getServiceId(), event.getValue(), event.getTimestamp());
return sendToPrometheus(requestBody, context.getSagaId());
}
@Override
public CompletableFuture<Void> compensation(SagaContext<MeteringEvent> context) {
MeteringEvent event = context.getPayload();
// 补偿操作:写入一个负值
String requestBody = String.format("service_usage_bytes_correction{user=\"%s\", service=\"%s\"} %d %d",
event.getUserId(), event.getServiceId(), -event.getValue(), event.getTimestamp());
logger.warn("Executing compensation for Prometheus. Saga ID: {}", context.getSagaId());
// 补偿操作的失败后果更严重,需要更强的重试机制
return sendToPrometheus(requestBody, context.getSagaId());
}
private CompletableFuture<Void> sendToPrometheus(String body, String sagaId) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(remoteWriteUrl))
.header("Content-Type", "text/plain") // 实际应为 "application/x-protobuf"
.header("X-Saga-ID", sagaId)
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenAccept(response -> {
if (response.statusCode() >= 200 && response.statusCode() < 300) {
logger.info("Successfully sent data to Prometheus. Saga ID: {}, Status: {}", sagaId, response.statusCode());
} else {
// 抛出异常以触发Saga的回滚
logger.error("Failed to send data to Prometheus. Saga ID: {}, Status: {}, Body: {}",
sagaId, response.statusCode(), response.body());
throw new RuntimeException("Prometheus remote write failed with status " + response.statusCode());
}
});
}
}
StageForHadoopStep
则负责向Kafka写入数据。
// src/main/java/com/example/metering/steps/StageForHadoopStep.java
package com.example.metering.steps;
import com.example.metering.model.MeteringEvent;
import com.example.metering.saga.SagaContext;
import com.example.metering.saga.SagaStep;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
public class StageForHadoopStep implements SagaStep<MeteringEvent> {
private static final Logger logger = LoggerFactory.getLogger(StageForHadoopStep.class);
private final KafkaProducer<String, MeteringEvent> producer;
private final String topic;
public StageForHadoopStep(KafkaProducer<String, MeteringEvent> producer, String topic) {
this.producer = producer;
this.topic = topic;
}
@Override
public String getName() {
return "StageForHadoop";
}
@Override
public CompletableFuture<Void> action(SagaContext<MeteringEvent> context) {
MeteringEvent event = context.getPayload();
ProducerRecord<String, MeteringEvent> record = new ProducerRecord<>(topic, event.getUserId(), event);
CompletableFuture<Void> future = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception != null) {
logger.error("Failed to send record to Kafka for saga [{}].", context.getSagaId(), exception);
future.completeExceptionally(exception);
} else {
logger.info("Successfully staged record for Hadoop in topic [{}], partition [{}], offset [{}].",
metadata.topic(), metadata.partition(), metadata.offset());
future.complete(null);
}
});
return future;
}
@Override
public CompletableFuture<Void> compensation(SagaContext<MeteringEvent> context) {
// 向Kafka写入消息的补偿操作通常比较复杂。
// 方案1:如果业务允许,可以什么都不做。因为消息根本没写入成功。
// 方案2:如果action可能有部分成功(例如Kafka集群脑裂),则可能需要写入一个“作废”消息。
// 在这个编排式Saga中,action失败时,消息并未成功写入,因此补偿操作是空操作。
logger.warn("Compensation for StageForHadoop is a no-op as the action is transactional. Saga ID: {}", context.getSagaId());
return CompletableFuture.completedFuture(null);
}
}
将它们组合起来,计量服务的入口点看起来是这样的。
// MeteringService entrypoint
public class MeteringService {
private final SagaOrchestrator<MeteringEvent> orchestrator;
public MeteringService() {
// ... 初始化 KafkaProducer, ExecutorService等 ...
ExecutorService executor = Executors.newFixedThreadPool(10);
KafkaProducer<String, MeteringEvent> producer = createKafkaProducer();
List<SagaStep<MeteringEvent>> steps = List.of(
new PersistToPrometheusStep("http://prometheus:9090/api/v1/write"),
new StageForHadoopStep(producer, "metering-events-topic")
);
this.orchestrator = new SagaOrchestrator<>(steps, executor);
}
public CompletableFuture<Void> processEvent(MeteringEvent event) {
return orchestrator.execute(event);
}
// ... other methods, e.g., createKafkaProducer ...
}
架构的扩展性与局限性
这个基于Saga的架构,通过在应用层实现事务逻辑,确保了数据在实时监控系统(Prometheus)和离线分析系统(Hadoop)之间的最终一致性。它将一个复杂的分布式一致性问题,分解为一系列可管理、可补偿的本地事务。
然而,该方案并非没有代价。首先,它引入了额外的协调逻辑,增加了系统的复杂性和请求处理的延迟。Saga协调器本身成为一个需要高可用保障的关键组件。其次,补偿操作的设计是整个方案的难点。并非所有操作都易于补偿,如PersistToPrometheusStep
的例子所示,我们有时只能采用“冲销”这类逻辑补偿,这会对下游的数据查询和处理逻辑产生影响,查询时必须同时考虑原始指标和冲销指标。
未来的迭代可以考虑引入成熟的Saga框架(如Axon, Camunda),它们提供了更完善的状态管理、持久化和失败恢复机制。此外,对于补偿逻辑的监控和告警至关重要,任何补偿失败都意味着数据进入了不确定状态,必须触发人工干预流程。此架构的适用边界在于那些对数据完整性有强要求,但可以容忍毫秒到秒级延迟,并接受最终一致性模型的场景。