利用CDC、Pulsar与Neo4j构建事务一致的实时图数据管道


在真实的业务场景中,将关系型数据库(如PostgreSQL)作为核心交易系统(OLTP)的黄金标准,而使用图数据库(如Neo4j)进行深度关系分析、欺诈检测或实时推荐,是一种常见的异构持久化架构。这里的核心挑战在于如何维持两者之间的数据同步,特别是要求低延迟和事务一致性。

方案权衡:为何绕开双写与轮询

在架构选型阶段,我们评估了两种常见的同步方案:

  1. 应用层双写(Dual Writes): 业务代码在完成本地事务后,同步或异步地调用Neo4j的API写入图数据。

    • 劣势: 这是最需要规避的方案。它引入了分布式事务的复杂性。如果图数据库写入失败,如何回滚主库事务?如果使用异步消息,如何保证消息一定能成功发送?这会严重侵蚀业务代码的整洁性,并将数据一致性的保证责任强加给每一个开发者,导致系统长期可维护性极差。
  2. 定时轮询(Polling): 定时任务(例如每分钟)扫描关系型数据库中的业务表,找出自上次同步以来的数据变更(通常通过update_time字段),然后批量更新到Neo4j。

    • 劣势: 延迟是主要问题,无法满足实时性要求。其次,对源数据库的侵入性很强,高频率的轮询会给生产OLTP数据库带来不必要的查询压力。更致命的是,它无法捕获到“删除”操作,除非表中包含逻辑删除标志,且无法精确还原事务的中间状态。

这两种方案在生产环境中都存在明显的短板。因此,我们最终将目光投向了基于日志捕获的CDC(Change Data Capture)方案。它通过读取数据库的预写日志(WAL),以非侵入的方式捕MSC取所有数据变更,实现了真正的低延迟和数据完整性。

最终架构:Debezium + Pulsar + Neo4j

我们设计的最终架构利用了开源社区的成熟工具链,确保了管道的稳定性和可扩展性。

graph TD
    A[PostgreSQL] -- WAL --> B(Debezium Connector);
    B -- CDC Events --> C{Apache Pulsar};
    C -- Data & Tx Topics --> D[GraphSync Consumer Service];
    D -- Cypher Transactions --> E[Neo4j Database];
    
    subgraph Pulsar Cluster
        C
    end
    
    subgraph Kubernetes Pod
        D
    end

这个流程的核心逻辑是:

  1. Debezium PostgreSQL Connector: 作为一个Kafka Connect插件(可与Pulsar的Kafka-on-Pulsar KoP或Pulsar-IO适配),它连接到PostgreSQL的逻辑复制槽,实时读取WAL中的数据变更。
  2. Apache Pulsar: 充当高吞吐、低延迟的持久化消息总线。Debezium将数据变更事件(Insert, Update, Delete)和事务边界事件(BEGIN, END)发送到Pulsar的不同Topic中。选用Pulsar的关键在于其优秀的多租户、分层存储以及对消息顺序的强保证。
  3. GraphSync Consumer Service: 一个我们自行开发的、无状态的消费者服务。它订阅Pulsar中的相关Topic,在内存中根据事务ID重组事务内的所有DML操作,然后在收到事务END事件后,将这些操作原子性地应用到Neo4j中。

关键实现:处理事务边界

CDC最大的挑战之一是如何在下游重构源数据库的事务。一个业务操作可能涉及对多张表的多次修改,这些修改必须作为一个整体在Neo4j中生效,否则会导致图数据处于不一致的中间状态。

Debezium为此提供了事务元数据支持。我们需要在Connector配置中启用它。

Debezium Connector 配置

以下是一个部署到Pulsar IO环境的PostgreSQL Connector配置示例。注意transaction.topicprovide.transaction.metadata的设置。

{
  "name": "pg-graph-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "your-postgres-host",
    "database.port": "5432",
    "database.user": "your_user",
    "database.password": "your_password",
    "database.dbname": "your_db",
    "database.server.name": "pg_server_01",
    "plugin.name": "pgoutput",
    "table.include.list": "public.users,public.orders,public.order_items",
    
    // 核心配置:开启事务元数据
    "provide.transaction.metadata": "true",
    "transaction.topic": "pg_server_01.transaction",

    // Pulsar 相关配置 (使用Pulsar IO Sink)
    "pulsar.service.url": "pulsar://your-pulsar-broker:6650",
    
    // 类型转换与序列化
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

配置完成后,Pulsar中会多出一个pg_server_01.transaction的Topic,用于接收事务的开始和结束信号。而public.users等表的变更事件则会发送到各自的Topic中。

事务重组消费者的实现 (Java)

我们的核心消费者GraphSyncConsumer需要同时消费数据Topic和事务Topic。这里的难点在于如何缓存并协调这些事件。一个生产级的实现需要考虑并发、内存管理和容错。

import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class GraphSyncConsumer {

    private static final Logger logger = LoggerFactory.getLogger(GraphSyncConsumer.class);
    private final PulsarClient pulsarClient;
    private final Neo4jWriter neo4jWriter;
    private final ObjectMapper objectMapper = new ObjectMapper();

    // Key: Transaction ID, Value: List of CDC event payloads within that transaction
    private final Map<String, List<JsonNode>> transactionBuffer = new ConcurrentHashMap<>();

    private final String DATA_TOPIC_PATTERN = "persistent://public/default/pg_server_01.public.*";
    private final String TRANSACTION_TOPIC = "persistent://public/default/pg_server_01.transaction";
    private final String SUBSCRIPTION_NAME = "graph-sync-subscription";

    public GraphSyncConsumer(String pulsarUrl, String neo4jUri, String neo4jUser, String neo4jPassword) throws PulsarClientException {
        this.pulsarClient = PulsarClient.builder().serviceUrl(pulsarUrl).build();
        this.neo4jWriter = new Neo4jWriter(neo4jUri, neo4jUser, neo4jPassword);
    }

    public void start() throws PulsarClientException {
        // 使用单线程执行器确保处理消息的顺序性,避免并发修改 transactionBuffer
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

        Consumer<byte[]> dataConsumer = pulsarClient.newConsumer()
                .topicsPattern(Pattern.compile(DATA_TOPIC_PATTERN))
                .subscriptionName(SUBSCRIPTION_NAME)
                .subscriptionType(SubscriptionType.Key_Shared) // Key_Shared 保证同一主键的事件由同一个消费者处理
                .messageListener((consumer, msg) -> {
                    singleThreadExecutor.submit(() -> handleDataMessage(consumer, msg));
                })
                .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(5)
                    .deadLetterTopic("persistent://public/default/dlq-graph-sync")
                    .build())
                .subscribe();

        Consumer<byte[]> transactionConsumer = pulsarClient.newConsumer()
                .topic(TRANSACTION_TOPIC)
                .subscriptionName(SUBSCRIPTION_NAME)
                .subscriptionType(SubscriptionType.Shared) // 事务消息可以被并发处理
                .messageListener((consumer, msg) -> {
                     singleThreadExecutor.submit(() -> handleTransactionMessage(consumer, msg));
                })
                .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(5)
                    .deadLetterTopic("persistent://public/default/dlq-graph-sync")
                    .build())
                .subscribe();

        logger.info("Consumers started. Listening for CDC events...");
    }

    private void handleDataMessage(Consumer<byte[]> consumer, Message<byte[]> msg) {
        try {
            JsonNode payload = objectMapper.readTree(msg.getData());
            // Debezium 事件的 'transaction' 字段包含了事务的元数据
            JsonNode transactionMeta = payload.get("payload").get("transaction");
            if (transactionMeta != null && !transactionMeta.isNull()) {
                String txId = transactionMeta.get("id").asText();
                transactionBuffer.computeIfAbsent(txId, k -> new ArrayList<>()).add(payload);
                logger.debug("Buffered event for transaction {}", txId);
            } else {
                logger.warn("Received a data message without transaction metadata. Skipping. MessageId: {}", msg.getMessageId());
            }
            consumer.acknowledge(msg);
        } catch (IOException | PulsarClientException e) {
            logger.error("Failed to process data message, nacking. MessageId: {}", msg.getMessageId(), e);
            consumer.negativeAcknowledge(msg);
        }
    }

    private void handleTransactionMessage(Consumer<byte[]> consumer, Message<byte[]> msg) {
        try {
            JsonNode payload = objectMapper.readTree(msg.getData());
            String txId = payload.get("payload").get("id").asText();
            String status = payload.get("payload").get("status").asText();

            if ("END".equals(status)) {
                List<JsonNode> events = transactionBuffer.remove(txId);
                if (events != null && !events.isEmpty()) {
                    logger.info("Processing transaction {} with {} events.", txId, events.size());
                    // 核心逻辑:原子性地将事务中的所有事件写入 Neo4j
                    neo4jWriter.applyTransaction(events);
                } else {
                    logger.info("Transaction {} ended with no data events to process.", txId);
                }
            }
            // BEGIN 事件在这里可以忽略,我们只关心 END
            consumer.acknowledge(msg);
        } catch (Exception e) {
            logger.error("Failed to process transaction message, nacking. MessageId: {}", msg.getMessageId(), e);
            // 这里失败了也需要nack,让Pulsar重试,否则可能导致数据事件永远积压在内存中
            consumer.negativeAcknowledge(msg);
        }
    }

    public void close() throws PulsarClientException {
        pulsarClient.close();
        neo4jWriter.close();
    }
}

Neo4j 写入模块的幂等性设计

消费者服务必须保证幂等性。由于网络问题或服务重启,Pulsar可能会重发消息。如果处理逻辑不幂等,就会在Neo4j中创建重复的节点或关系。

我们使用MERGE Cypher命令来实现这一点。MERGE会查找匹配指定模式的节点/关系,如果找到则返回,如果找不到则创建它。这确保了同一操作执行多次的结果和执行一次完全相同。

import org.neo4j.driver.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.neo4j.driver.Values.parameters;

public class Neo4jWriter implements AutoCloseable {

    private static final Logger logger = LoggerFactory.getLogger(Neo4jWriter.class);
    private final Driver driver;

    public Neo4jWriter(String uri, String user, String password) {
        this.driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
        try (Session session = driver.session()) {
            // 创建唯一性约束,这是幂等性的基础
            session.run("CREATE CONSTRAINT user_id_unique IF NOT EXISTS FOR (u:User) REQUIRE u.id IS UNIQUE");
            session.run("CREATE CONSTRAINT order_id_unique IF NOT EXISTS FOR (o:Order) REQUIRE o.id IS UNIQUE");
            logger.info("Ensured Neo4j constraints are in place.");
        }
    }

    // 将一组CDC事件在一个Neo4j事务中执行
    public void applyTransaction(List<JsonNode> events) {
        try (Session session = driver.session()) {
            session.writeTransaction(tx -> {
                for (JsonNode event : events) {
                    processSingleEvent(tx, event);
                }
                return null; // 返回值没有实际用途
            });
            logger.info("Successfully applied batch of {} events to Neo4j.", events.size());
        } catch (Exception e) {
            // 这是关键的错误处理部分
            // 如果Neo4j事务失败,Pulsar的nack机制会触发重试
            logger.error("Failed to apply transaction to Neo4j. Events will be re-processed.", e);
            throw new RuntimeException("Neo4j transaction failed", e);
        }
    }

    private void processSingleEvent(Transaction tx, JsonNode event) {
        String op = event.get("payload").get("op").asText(); // c, u, d for create, update, delete
        JsonNode record = event.get("payload").get("after");
        if (record == null || record.isNull()) {
            record = event.get("payload").get("before"); // For delete operations
        }
        
        String tableName = event.get("payload").get("source").get("table").asText();

        switch (tableName) {
            case "users":
                handleUserEvent(tx, op, record);
                break;
            case "orders":
                handleOrderEvent(tx, op, record);
                break;
            case "order_items":
                handleOrderItemEvent(tx, op, record);
                break;
            default:
                logger.warn("Unsupported table event: {}", tableName);
        }
    }

    private void handleUserEvent(Transaction tx, String op, JsonNode data) {
        long userId = data.get("id").asLong();
        if ("d".equals(op)) {
            tx.run("MATCH (u:User {id: $id}) DETACH DELETE u", parameters("id", userId));
            logger.debug("Deleted User {}", userId);
        } else { // create or update
            Map<String, Object> props = Map.of(
                "id", userId,
                "name", data.get("name").asText(),
                "email", data.get("email").asText()
            );
            tx.run("MERGE (u:User {id: $props.id}) SET u += $props", parameters("props", props));
            logger.debug("Merged User {}", userId);
        }
    }

    private void handleOrderEvent(Transaction tx, String op, JsonNode data) {
        long orderId = data.get("id").asLong();
        if ("d".equals(op)) {
            tx.run("MATCH (o:Order {id: $id}) DETACH DELETE o", parameters("id", orderId));
        } else {
            long userId = data.get("user_id").asLong();
            Map<String, Object> props = Map.of(
                "id", orderId,
                "order_date", data.get("order_date").asText(),
                "total_amount", data.get("total_amount").asDouble()
            );
            // MERGE 订单节点,然后 MERGE 关系
            tx.run(
                "MERGE (o:Order {id: $props.id}) SET o += $props " +
                "WITH o " +
                "MATCH (u:User {id: $userId}) " +
                "MERGE (u)-[:PLACED_ORDER]->(o)",
                parameters("props", props, "userId", userId)
            );
        }
    }

    // 此处省略 handleOrderItemEvent 的实现,逻辑类似,会创建 (Order)-[:CONTAINS]->(Product) 的关系

    @Override
    public void close() {
        driver.close();
    }
}

架构的局限性与未来优化路径

尽管此架构在实时性和一致性方面表现出色,但在部署到生产环境前,仍需考虑几个现实问题:

  1. 初始快照(Initial Snapshot): CDC管道只处理增量变更。对于一个已存在大量数据的系统,如何进行首次全量同步?Debezium支持snapshot.mode,但对于TB级的大表,这可能会对源库造成压力且耗时很长。一个更稳妥的方案是,先通过离线方式(如导出CSV,使用neo4j-admin import)进行一次性的全量导入,然后启动CDC管道处理后续增量数据。这个过程需要精确协调,确保在离线导入完成和CDC启动之间没有数据丢失。

  2. Schema 演进: 当源数据库的表结构发生变化时(如ALTER TABLE),Debezium会发出相应的事件,但我们的消费者GraphSyncConsumer目前并未处理这类事件。一个简单的ADD COLUMN可能不会导致服务失败,但DROP COLUMNRENAME COLUMN则会使消费者代码抛出异常。一个更健壮的系统需要实现对Schema变更事件的解析,并采取相应策略,例如自动更新Neo4j中的节点属性,或者发送告警通知开发人员介入。

  3. 消费者横向扩展: 当前的GraphSyncConsumer实现使用单线程处理所有消息以保证事务的顺序性。当数据变更的速率非常高时,这可能成为瓶颈。Pulsar的Key_Shared订阅模式允许我们启动多个消费者实例,并确保拥有相同主键的消息被路由到同一个实例。然而,由于我们的事务重组逻辑依赖于一个集中的transactionBuffer,简单的横向扩展会破坏事务的完整性。一个可行的扩展方案是,将Debezium配置为按主键进行数据分区,并将事务事件广播到所有分区,这样每个消费者实例只负责一部分数据的事务重组。这会增加实现的复杂性,但为水平扩展提供了可能。

  4. 内存管理: transactionBuffer会在内存中缓存进行中的长事务的所有事件。如果源数据库存在运行时间非常长的事务(例如,一个持续数小时的批量更新任务),这可能会消耗大量的消费者内存,甚至导致OOM。需要为transactionBuffer增加监控和保护机制,例如限制单个事务所能缓存的事件数量,或对超大事务进行告警并进行特殊处理。


  目录