构建基于Saga模式连接Prometheus TSDB与Hadoop的分布式计量架构


一个看似简单的需求摆在面前:我们需要为云服务构建一个计量(Metering)系统。它必须能处理每秒数十万次的资源使用事件,为运营团队提供近乎实时的监控视图,同时,这些数据必须100%可靠地归档,用于月底的精确计费和年度的业务分析。实时性要求指向了Prometheus这样的时序数据库,而长期、海量、低成本的存储与分析则天然地选择了Hadoop生态。问题在于,如何保证一个事件既被Prometheus正确接收,又被Hadoop生态系统完整归档?

在真实项目中,任何跨多个独立系统的写入操作都隐含着数据不一致的风险。如果服务先写入Prometheus成功,但在写入Hadoop的数据管道(例如Kafka)时宕机,我们就有了监控数据,却丢失了计费数据,这是灾难性的。反之亦然。这本质上是一个分布式事务问题。

方案A:先写TSDB,后异步归档

最初的构想非常直接:计量服务接收到数据,直接通过Remote Write协议写入Prometheus集群。同时,部署一个独立的归档服务,定期(例如每小时)从Prometheus查询原始数据,批量转换为Parquet格式,再写入HDFS。

sequenceDiagram
    participant Client
    participant MeteringService
    participant Prometheus
    participant Archiver
    participant HDFS

    Client->>+MeteringService: Report Usage Event
    MeteringService->>+Prometheus: Remote Write Metrics
    Prometheus-->>-MeteringService: Ack
    MeteringService-->>-Client: OK

    loop Every 1 Hour
        Archiver->>+Prometheus: Query Raw Data
        Prometheus-->>-Archiver: Return Time Series Data
        Archiver->>Archiver: Transform to Parquet
        Archiver->>+HDFS: Write Parquet File
        HDFS-->>-Archiver: Ack
    end

这种架构的优势是简单,组件职责清晰。但它的致命缺陷在于可靠性。

  1. 数据丢失窗口:在归档服务两次执行的间隔期内,如果Prometheus节点发生不可逆的磁盘故障,这部分数据将永久丢失。对于计费系统,这是不可接受的。
  2. Prometheus压力:大规模、高频的范围查询对Prometheus是一个巨大的负担,可能影响其作为核心监控系统的稳定性。
  3. 数据一致性:无法保证查询操作能精确地、无重复、无遗漏地导出所有数据,尤其是在网络分区或服务重启的边缘情况下。

这个方案在非关键业务监控场景下或许堪用,但对于要求审计级别数据完整性的计费系统,它过于脆弱。

方案B:事件总线驱动的双写

为了解决数据丢失问题,引入消息队列(如Kafka)作为中间缓冲层是业界的标准做法。计量服务将事件写入Kafka,然后由两个独立的消费者组分别消费数据,一个写入Prometheus,另一个写入HDFS。

sequenceDiagram
    participant Client
    participant MeteringService
    participant Kafka
    participant PrometheusConsumer
    participant HadoopConsumer
    participant Prometheus
    participant HDFS

    Client->>+MeteringService: Report Usage Event
    MeteringService->>+Kafka: Produce Event
    Kafka-->>-MeteringService: Ack
    MeteringService-->>-Client: OK

    par
        PrometheusConsumer->>+Kafka: Consume Event
        Kafka-->>-PrometheusConsumer: Deliver Event
        PrometheusConsumer->>+Prometheus: Remote Write
        Prometheus-->>-PrometheusConsumer: Ack
        PrometheusConsumer->>+Kafka: Commit Offset
    and
        HadoopConsumer->>+Kafka: Consume Event
        Kafka-->>-HadoopConsumer: Deliver Event
        HadoopConsumer->>+HDFS: Write to Staging Area
        HDFS-->>-HadoopConsumer: Ack
        HadoopConsumer->>+Kafka: Commit Offset
    end

该方案通过Kafka的持久化能力,极大地提升了数据的可靠性。只要事件成功写入Kafka,后续的处理失败都可以通过重试来解决。然而,它引入了一个新的、更隐蔽的一致性问题:两个消费者是独立的事务单元。

设想一个场景:PrometheusConsumer成功消费了消息并写入了Prometheus,然后提交了Offset。但HadoopConsumer在处理同一条消息时,由于HDFS暂时不可用或自身逻辑错误,反复失败并最终被阻塞或跳过。结果是,运营仪表盘上显示了用户的使用量,但计费系统里却没有这条记录。两个系统的数据产生了永久性的分歧。

最终选择:基于Saga模式的编排式数据持久化

我们需要的是一个原子性的操作,确保“写入Prometheus”和“写入Hadoop管道”这两个动作要么都成功,要么都失败。由于这两个系统不支持两阶段提交(2PC),我们必须在应用层实现最终一致性。Saga模式是解决这类问题的经典方案。

我们将整个流程定义为一个Saga,它由一系列的本地事务(步骤)组成。每个步骤都有一个对应的补偿操作。如果任何步骤失败,Saga协调器会依次调用前面已成功步骤的补偿操作,从而在逻辑上“回滚”整个事务。

我们的Saga包含两个核心步骤:

  1. PersistToPrometheus: 将计量数据写入Prometheus。
  2. StageForHadoop: 将相同的数据写入Kafka,作为进入Hadoop生态的持久化入口。

如果PersistToPrometheus成功,但StageForHadoop失败,我们需要执行PersistToPrometheus的补偿操作。这里的坑在于,从Prometheus中精确删除一个数据点是困难且低效的。一个更务实的补偿方案是:写入一个“冲销”指标。例如,如果原始指标是service_usage_bytes{user="A"} 1024,补偿操作就是写入service_usage_bytes_correction{user="A"} -1024。这样在查询时,将原始指标和冲销指标相加,就能得到正确的结果。

核心实现概览

我们将使用Java来实现这个Saga协调器。首先定义Saga步骤的接口。

// src/main/java/com/example/metering/saga/SagaStep.java
package com.example.metering.saga;

import java.util.concurrent.CompletableFuture;

/**
 * 定义Saga流程中的一个步骤.
 * 每个步骤都包含一个正向操作 (action) 和一个补偿操作 (compensation).
 * @param <T> Saga上下文对象的类型
 */
public interface SagaStep<T> {

    /**
     * 执行正向操作.
     * @param context 包含执行所需数据的Saga上下文
     * @return 一个表示异步操作完成的CompletableFuture
     */
    CompletableFuture<Void> action(SagaContext<T> context);

    /**
     * 执行补偿操作.
     * 只有在action成功后,后续步骤失败时才会被调用.
     * @param context 包含补偿所需数据的Saga上下文
     * @return 一个表示异步补偿操作完成的CompletableFuture
     */
    CompletableFuture<Void> compensation(SagaContext<T> context);

    /**
     * 步骤的名称,用于日志和调试.
     * @return 步骤名称
     */
    String getName();
}

SagaContext是用于在Saga各步骤间传递数据的上下文对象。

// src/main/java/com/example/metering/saga/SagaContext.java
package com.example.metering.saga;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class SagaContext<T> {
    private final String sagaId;
    private final T payload;
    private final Map<String, Object> stepData; // 用于存储每个步骤的中间结果

    public SagaContext(T payload) {
        this.sagaId = UUID.randomUUID().toString();
        this.payload = payload;
        this.stepData = new ConcurrentHashMap<>();
    }

    public String getSagaId() {
        return sagaId;
    }

    public T getPayload() {
        return payload;
    }

    public void putStepData(String key, Object value) {
        this.stepData.put(key, value);
    }

    @SuppressWarnings("unchecked")
    public <V> V getStepData(String key) {
        return (V) this.stepData.get(key);
    }
}

接下来是Saga协调器的实现,它负责按顺序执行步骤并处理失败。

// src/main/java/com/example/metering/saga/SagaOrchestrator.java
package com.example.metering.saga;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

public class SagaOrchestrator<T> {

    private static final Logger logger = LoggerFactory.getLogger(SagaOrchestrator.class);

    private final List<SagaStep<T>> steps;
    private final Executor executor;

    public SagaOrchestrator(List<SagaStep<T>> steps, Executor executor) {
        this.steps = steps;
        this.executor = executor;
    }

    public CompletableFuture<Void> execute(T payload) {
        SagaContext<T> context = new SagaContext<>(payload);
        Stack<SagaStep<T>> executedSteps = new Stack<>();

        CompletableFuture<Void> future = CompletableFuture.completedFuture(null);

        for (SagaStep<T> step : steps) {
            future = future.thenComposeAsync(v -> {
                logger.info("Saga [{}]: Executing step [{}]", context.getSagaId(), step.getName());
                return step.action(context)
                        .thenAccept(result -> executedSteps.push(step))
                        .exceptionally(ex -> {
                            logger.error("Saga [{}]: Step [{}] failed. Starting compensation.", context.getSagaId(), step.getName(), ex);
                            throw new SagaExecutionException(ex, executedSteps);
                        });
            }, executor);
        }

        return future.exceptionally(ex -> {
            if (ex.getCause() instanceof SagaExecutionException) {
                SagaExecutionException sagaEx = (SagaExecutionException) ex.getCause();
                compensate(context, sagaEx.getExecutedSteps());
            } else {
                 logger.error("Saga [{}]: Unexpected error during execution.", context.getSagaId(), ex);
            }
            // 重新抛出异常,让调用方知道Saga失败了
            throw new RuntimeException("Saga execution failed and compensation was triggered.", ex);
        });
    }

    private void compensate(SagaContext<T> context, Stack<SagaStep<T>> executedSteps) {
        logger.info("Saga [{}]: Starting compensation for {} steps.", context.getSagaId(), executedSteps.size());
        while (!executedSteps.isEmpty()) {
            SagaStep<T> stepToCompensate = executedSteps.pop();
            try {
                logger.warn("Saga [{}]: Compensating for step [{}]", context.getSagaId(), stepToCompensate.getName());
                // 在真实项目中,这里的补偿失败需要有重试和告警机制
                stepToCompensate.compensation(context).join();
            } catch (Exception ex) {
                // 补偿失败是一个严重问题,必须记录并告警
                logger.error("Saga [{}]: CRITICAL - Compensation for step [{}] failed. Manual intervention required.",
                        context.getSagaId(), stepToCompensate.getName(), ex);
                // 停止补偿链,因为状态已不确定
                break;
            }
        }
    }

    private static class SagaExecutionException extends RuntimeException {
        private final Stack<SagaStep<T>> executedSteps;

        public SagaExecutionException(Throwable cause, Stack<SagaStep<T>> executedSteps) {
            super(cause);
            this.executedSteps = executedSteps;
        }

        public Stack<SagaStep<T>> getExecutedSteps() {
            return executedSteps;
        }
    }
}

现在,我们来实现具体的步骤。PersistToPrometheusStep负责与Prometheus交互。这里我们模拟一个HTTP客户端来发送Remote Write请求。在生产环境中,应该使用一个健壮的HTTP客户端库(如OkHttp)并处理连接池、超时和重试。

// src/main/java/com/example/metering/steps/PersistToPrometheusStep.java
package com.example.metering.steps;

// 假设的依赖
import com.example.metering.model.MeteringEvent;
import com.example.metering.saga.SagaContext;
import com.example.metering.saga.SagaStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.util.concurrent.CompletableFuture;

public class PersistToPrometheusStep implements SagaStep<MeteringEvent> {

    private static final Logger logger = LoggerFactory.getLogger(PersistToPrometheusStep.class);
    private final HttpClient httpClient;
    private final String remoteWriteUrl;

    public PersistToPrometheusStep(String remoteWriteUrl) {
        this.remoteWriteUrl = remoteWriteUrl;
        this.httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_2).build();
    }

    @Override
    public String getName() {
        return "PersistToPrometheus";
    }
    
    @Override
    public CompletableFuture<Void> action(SagaContext<MeteringEvent> context) {
        MeteringEvent event = context.getPayload();
        // 实际实现中,这里需要将MeteringEvent转换为Prometheus Remote Write的Protobuf格式
        // 并使用Snappy压缩。为简化,我们只发送一个模拟的字符串。
        String requestBody = String.format("service_usage_bytes{user=\"%s\", service=\"%s\"} %d %d",
                event.getUserId(), event.getServiceId(), event.getValue(), event.getTimestamp());

        return sendToPrometheus(requestBody, context.getSagaId());
    }

    @Override
    public CompletableFuture<Void> compensation(SagaContext<MeteringEvent> context) {
        MeteringEvent event = context.getPayload();
        // 补偿操作:写入一个负值
        String requestBody = String.format("service_usage_bytes_correction{user=\"%s\", service=\"%s\"} %d %d",
                event.getUserId(), event.getServiceId(), -event.getValue(), event.getTimestamp());
        
        logger.warn("Executing compensation for Prometheus. Saga ID: {}", context.getSagaId());
        // 补偿操作的失败后果更严重,需要更强的重试机制
        return sendToPrometheus(requestBody, context.getSagaId());
    }

    private CompletableFuture<Void> sendToPrometheus(String body, String sagaId) {
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(remoteWriteUrl))
                .header("Content-Type", "text/plain") // 实际应为 "application/x-protobuf"
                .header("X-Saga-ID", sagaId)
                .POST(HttpRequest.BodyPublishers.ofString(body))
                .build();

        return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
                .thenAccept(response -> {
                    if (response.statusCode() >= 200 && response.statusCode() < 300) {
                        logger.info("Successfully sent data to Prometheus. Saga ID: {}, Status: {}", sagaId, response.statusCode());
                    } else {
                        // 抛出异常以触发Saga的回滚
                        logger.error("Failed to send data to Prometheus. Saga ID: {}, Status: {}, Body: {}",
                                sagaId, response.statusCode(), response.body());
                        throw new RuntimeException("Prometheus remote write failed with status " + response.statusCode());
                    }
                });
    }
}

StageForHadoopStep则负责向Kafka写入数据。

// src/main/java/com/example/metering/steps/StageForHadoopStep.java
package com.example.metering.steps;

import com.example.metering.model.MeteringEvent;
import com.example.metering.saga.SagaContext;
import com.example.metering.saga.SagaStep;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

public class StageForHadoopStep implements SagaStep<MeteringEvent> {

    private static final Logger logger = LoggerFactory.getLogger(StageForHadoopStep.class);
    private final KafkaProducer<String, MeteringEvent> producer;
    private final String topic;

    public StageForHadoopStep(KafkaProducer<String, MeteringEvent> producer, String topic) {
        this.producer = producer;
        this.topic = topic;
    }

    @Override
    public String getName() {
        return "StageForHadoop";
    }

    @Override
    public CompletableFuture<Void> action(SagaContext<MeteringEvent> context) {
        MeteringEvent event = context.getPayload();
        ProducerRecord<String, MeteringEvent> record = new ProducerRecord<>(topic, event.getUserId(), event);
        
        CompletableFuture<Void> future = new CompletableFuture<>();
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                logger.error("Failed to send record to Kafka for saga [{}].", context.getSagaId(), exception);
                future.completeExceptionally(exception);
            } else {
                logger.info("Successfully staged record for Hadoop in topic [{}], partition [{}], offset [{}].",
                        metadata.topic(), metadata.partition(), metadata.offset());
                future.complete(null);
            }
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> compensation(SagaContext<MeteringEvent> context) {
        // 向Kafka写入消息的补偿操作通常比较复杂。
        // 方案1:如果业务允许,可以什么都不做。因为消息根本没写入成功。
        // 方案2:如果action可能有部分成功(例如Kafka集群脑裂),则可能需要写入一个“作废”消息。
        // 在这个编排式Saga中,action失败时,消息并未成功写入,因此补偿操作是空操作。
        logger.warn("Compensation for StageForHadoop is a no-op as the action is transactional. Saga ID: {}", context.getSagaId());
        return CompletableFuture.completedFuture(null);
    }
}

将它们组合起来,计量服务的入口点看起来是这样的。

// MeteringService entrypoint
public class MeteringService {
    private final SagaOrchestrator<MeteringEvent> orchestrator;

    public MeteringService() {
        // ... 初始化 KafkaProducer, ExecutorService等 ...
        ExecutorService executor = Executors.newFixedThreadPool(10);
        KafkaProducer<String, MeteringEvent> producer = createKafkaProducer();

        List<SagaStep<MeteringEvent>> steps = List.of(
            new PersistToPrometheusStep("http://prometheus:9090/api/v1/write"),
            new StageForHadoopStep(producer, "metering-events-topic")
        );

        this.orchestrator = new SagaOrchestrator<>(steps, executor);
    }

    public CompletableFuture<Void> processEvent(MeteringEvent event) {
        return orchestrator.execute(event);
    }
    
    // ... other methods, e.g., createKafkaProducer ...
}

架构的扩展性与局限性

这个基于Saga的架构,通过在应用层实现事务逻辑,确保了数据在实时监控系统(Prometheus)和离线分析系统(Hadoop)之间的最终一致性。它将一个复杂的分布式一致性问题,分解为一系列可管理、可补偿的本地事务。

然而,该方案并非没有代价。首先,它引入了额外的协调逻辑,增加了系统的复杂性和请求处理的延迟。Saga协调器本身成为一个需要高可用保障的关键组件。其次,补偿操作的设计是整个方案的难点。并非所有操作都易于补偿,如PersistToPrometheusStep的例子所示,我们有时只能采用“冲销”这类逻辑补偿,这会对下游的数据查询和处理逻辑产生影响,查询时必须同时考虑原始指标和冲销指标。

未来的迭代可以考虑引入成熟的Saga框架(如Axon, Camunda),它们提供了更完善的状态管理、持久化和失败恢复机制。此外,对于补偿逻辑的监控和告警至关重要,任何补偿失败都意味着数据进入了不确定状态,必须触发人工干预流程。此架构的适用边界在于那些对数据完整性有强要求,但可以容忍毫秒到秒级延迟,并接受最终一致性模型的场景。


  目录