Pulsar 对比 Redis Streams 在大规模 WebRTC 状态同步架构中的实现


一个常见的误区是认为 WebRTC 应用的瓶颈在于信令服务器的并发连接数或媒体服务器的带宽。在真实项目中,当交互复杂度提升时,真正的技术壁垒会迅速转移到分布式实时状态管理上。设想一个上千人参与的在线协作白板或一个大型互动教学场景,单纯的信令交换(SDP、ICE Candidate)只是入场券,如何保证每一个参与者看到的“画笔轨迹”、“激光笔位置”、“讲师翻页”等状态在毫秒级延迟内严格一致,且系统具备水平扩展和容错能力,这才是核心挑战。

问题本质上是一个多生产者、多消费者的分布式有序日志问题。每个客户端既是状态的生产者(例如,移动鼠标),也是其他所有客户端状态的消费者。直接通过信令服务器进行广播转发是一种简单实现,但在规模扩大后会立刻遇到瓶颈:

  1. 单点压力: 所有状态变更流量都经过信令服务器,使其成为性能和可用性的瓶颈。
  2. 状态一致性: 在多个信令服务器实例的分布式部署下,保证所有客户端收到的状态变更顺序一致,变得极其困难。
  3. 扩展性受限: 业务逻辑与信令逻辑紧耦合,增加新功能(如状态录制、分析)意味着要修改核心的信令服务。

为了解决这个问题,我们需要将状态同步的职责从信令服务器中剥离出来,引入一个专门的分布式消息系统。信令服务器退化为纯粹的客户端网关和消息管道。业界常用的轻量级方案是 Redis Streams,而更重量级的选项则是 Apache Pulsar。

方案 A: Redis Streams 作为状态总线

Redis Streams 提供了一个轻量级的、持久化的日志数据结构,非常适合作为实时状态同步的后端。

优势:

  • 极低延迟: 基于内存操作,XADDXREADGROUP 的性能非常出色。
  • 生态熟悉: 多数团队已有 Redis 的运维经验,引入成本低。
  • 消费者组: 内置的消费者组机制天然支持了信令服务器集群的水平扩展和消息处理的负载均衡。

劣势:

  • 有限的持久化与存储: 虽然可以持久化到 RDB/AOF,但它不是为海量、长期数据存储设计的。实现类似 Pulsar 的分层存储相当复杂。对于需要“回放”历史状态的场景,能力有限。
  • 集群与扩展性: Redis Cluster 的管理和扩展比专业的分布式消息队列(MQ)更复杂,尤其是在跨地域容灾方面。
  • 功能集: 缺少多租户、Schema 管理、多协议支持等企业级 MQ 特性。

核心实现概览 (Node.js + ioredis)

假设我们为每个 WebRTC 房间(roomId)创建一个独立的 Stream Key。信令服务器在收到客户端的状态更新时,将其写入对应的 Stream。

// redisClient.js - 生产级 Redis 客户端配置
const Redis = require('ioredis');

const redisConfig = {
    host: process.env.REDIS_HOST || '127.0.0.1',
    port: parseInt(process.env.REDIS_PORT || '6379', 10),
    password: process.env.REDIS_PASSWORD,
    db: 0,
    maxRetriesPerRequest: 3,
    enableReadyCheck: true,
    retryStrategy(times) {
        const delay = Math.min(times * 100, 2000); // 指数退避
        return delay;
    },
};

const redisClient = new Redis(redisConfig);

redisClient.on('error', (err) => {
    console.error('[Redis Client Error]', err);
});

redisClient.on('connect', () => {
    console.log('[Redis Client] Connected successfully.');
});

module.exports = redisClient;

信令服务器中的状态发布逻辑:

// signalingServer.js (部分)
const redisClient = require('./redisClient');

/**
 * 将客户端的状态变更发布到 Redis Stream
 * @param {string} roomId - 房间ID, 用作 Stream 的 key
 * @param {object} stateUpdate - 状态更新对象
 */
async function publishStateUpdate(roomId, stateUpdate) {
    const streamKey = `room:state:${roomId}`;
    try {
        // 使用 JSON 字符串存储状态,确保原子性
        const statePayload = JSON.stringify(stateUpdate);
        
        // XADD streamKey * field value [field value ...]
        // '*' 表示让 Redis 自动生成消息 ID
        const messageId = await redisClient.xadd(
            streamKey, 
            '*', 
            'payload', 
            statePayload
        );
        
        // 生产环境中,这里应有详细日志记录
        // console.log(`State update for room ${roomId} published with ID ${messageId}`);

    } catch (error) {
        console.error(`[Redis XADD Error] Failed to publish state for room ${roomId}:`, error);
        // 实际项目中需要加入重试或告警逻辑
        throw error;
    }
}

每个信令服务器实例作为一个消费者组的成员,消费所有房间的状态更新。

// stateConsumer.js
const redisClient = require('./redisClient');

const CONSUMER_GROUP = 'signaling_servers';
const CONSUMER_NAME_PREFIX = 'consumer_';

// Map<roomId, Set<WebSocketConnection>>
const roomConnections = new Map(); // 假设这个 Map 维护了房间和 WebSocket 连接的映射

async function setupConsumerGroup(streamKey) {
    try {
        // XGROUP CREATE streamKey groupName $ [MKSTREAM]
        // MKSTREAM 选项会在 stream 不存在时自动创建
        await redisClient.xgroup('CREATE', streamKey, CONSUMER_GROUP, '0-0', 'MKSTREAM');
    } catch (error) {
        // 如果组已存在,Redis 会返回错误,这是预期的行为
        if (!error.message.includes('BUSYGROUP')) {
            console.error(`[Redis XGROUP CREATE Error] Failed for stream ${streamKey}:`, error);
            throw error;
        }
    }
}

async function consumeStateUpdates() {
    const consumerName = `${CONSUMER_NAME_PREFIX}${require('os').hostname()}_${process.pid}`;
    const streamKeys = await redisClient.keys('room:state:*');
    
    // 实际项目中不应该用 KEYS,这里仅为示例。应有服务发现机制。
    // 应该为所有可能出现的 stream key 初始化消费者组
    for (const key of streamKeys) {
        await setupConsumerGroup(key);
    }
    
    // 启动一个长轮询来消费消息
    while (true) {
        try {
            // XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
            const response = await redisClient.xreadgroup(
                'GROUP', CONSUMER_GROUP, consumerName,
                'BLOCK', 5000, // 阻塞5秒等待新消息
                'STREAMS', 'room:state:*', '>' // '>' 表示消费尚未传递给任何消费者的消息
            );

            if (response) {
                for (const [streamKey, messages] of response) {
                    const roomId = streamKey.split(':')[2];
                    for (const [messageId, fields] of messages) {
                        await processMessage(roomId, messageId, fields);
                    }
                }
            }
        } catch (error) {
            console.error('[Redis XREADGROUP Error]', error);
            await new Promise(resolve => setTimeout(resolve, 5000)); // 发生错误后等待一段时间再重试
        }
    }
}

async function processMessage(roomId, messageId, fields) {
    try {
        const payloadIndex = fields.indexOf('payload');
        if (payloadIndex === -1) {
             console.warn(`[Message Format Error] No payload found for message ${messageId}`);
             return;
        }
        
        const stateUpdate = JSON.parse(fields[payloadIndex + 1]);
        const connections = roomConnections.get(roomId);

        if (connections) {
            // 将状态更新广播给房间内的所有客户端
            connections.forEach(ws => {
                if (ws.readyState === ws.OPEN) {
                    ws.send(JSON.stringify({ type: 'state-update', data: stateUpdate }));
                }
            });
        }

        // 确认消息处理完成
        await redisClient.xack(`room:state:${roomId}`, CONSUMER_GROUP, messageId);

    } catch (err) {
        console.error(`[Message Processing Error] Failed to process message ${messageId}:`, err);
        // 错误处理:可以实现重试逻辑,或者将失败的消息记录到死信队列
    }
}

// 启动消费者
// consumeStateUpdates();

这里的核心问题是 room:state:* 模式在 XREADGROUP 中并不直接支持。实际实现中,每个消费者需要明确知道自己要监听哪些流,或者轮询所有活动的流。这增加了系统的复杂性。

方案 B: Pulsar 作为分布式状态日志

Pulsar 被设计为一个云原生的分布式消息平台,提供了比 Redis Streams 更强大的功能集。

优势:

  • 计算与存储分离架构: BookKeeper 负责持久化存储,Broker 负责消息处理。这种架构带来了极高的水平扩展能力和数据持久性保障。
  • 分层存储: Pulsar 可以将老旧数据自动卸载到 S3、HDFS 等廉价存储中,既保证了数据可访问性(用于回放、分析),又降低了成本。
  • Key_Shared 订阅模式: 这是解决我们问题的关键。Key_Shared 模式可以保证携带相同 key 的消息被路由到同一个消费者。我们可以用 roomId 作为消息的 key,这样同一个房间的所有状态更新都会被同一个信令服务器实例按顺序处理,完美解决了状态一致性问题。
  • 多租户与地理复制: 对于构建全球性的实时应用,Pulsar 内置的这些功能是巨大的优势。

劣势:

  • 运维复杂度: 部署和维护一个 Pulsar 集群(Zookeeper, BookKeeper, Broker, Proxy)比 Redis 复杂得多。
  • 资源消耗: 相比 Redis,Pulsar 的资源占用更高。
  • 生态与客户端成熟度: 虽然发展迅速,但其客户端库和社区工具的丰富程度相比 Redis 还有差距。

架构决策

考虑到项目对未来规模的预期、对状态严格有序的需求以及可能扩展的全球化部署,我们决定采用 Pulsar。虽然初期运维成本更高,但其架构上的优势能够从根本上解决大规模实时状态同步的难题,避免了未来因技术选型限制而进行的痛苦重构。Key_Shared 订阅是这次选型的决定性因素。

graph TD
    subgraph Clients
        C1(Client 1)
        C2(Client 2)
        C3(Client 3)
    end

    subgraph Signaling Cluster
        S1(Signaling Server 1)
        S2(Signaling Server 2)
    end

    subgraph Pulsar Cluster
        B1(Broker 1)
        B2(Broker 2)
        BK(BookKeeper Ensemble)
    end

    T[Pulsar Topic: persistent://public/default/room-states]

    C1 -- WebSocket --> S1
    C2 -- WebSocket --> S2
    C3 -- WebSocket --> S1

    S1 -- Publishes state changes (key=roomId) --> B1
    S2 -- Publishes state changes (key=roomId) --> B2

    B1 -- Stores data --> BK
    B2 -- Stores data --> BK

    B1 --> T
    B2 --> T

    S1 -- Key_Shared Consumer --> T
    S2 -- Key_Shared Consumer --> T

上图展示了整体架构。客户端通过 WebSocket 连接到任意一个信令服务器。状态变更被附加 roomId 作为 key 发布到 Pulsar 的同一个 topic。Pulsar 的 Key_Shared 订阅确保了 room-A 的所有消息只会被 S1S2 中的一个消费,而 room-B 的消息可能会被另一个消费,但同一个房间的消息始终有序。

核心实现概览 (Node.js + pulsar-client)

// pulsarClient.js - 生产级 Pulsar 客户端配置
const Pulsar = require('pulsar-client');

const pulsarConfig = {
    serviceUrl: process.env.PULSAR_URL || 'pulsar://localhost:6650',
    operationTimeoutSeconds: 30,
    // 对于生产环境,应配置认证
    // authentication: new Pulsar.AuthenticationToken({ token: 'YOUR_JWT_TOKEN' }),
};

let client;
let producer;
let consumer;

async function getPulsarClient() {
    if (!client) {
        try {
            client = new Pulsar.Client(pulsarConfig);
            console.log('[Pulsar Client] Connection initiated.');
        } catch (error) {
            console.error('[Pulsar Client] Failed to create client:', error);
            process.exit(1); // 关键基础设施失败,直接退出
        }
    }
    return client;
}

async function getProducer() {
    if (!producer) {
        const pulsarClient = await getPulsarClient();
        try {
            producer = await pulsarClient.createProducer({
                topic: 'persistent://public/default/room-states',
                sendTimeoutMs: 10000,
                batchingEnabled: true,
                batchingMaxPublishDelayMs: 10, // 优化延迟
            });
            console.log('[Pulsar Producer] Created successfully.');
        } catch (error) {
            console.error('[Pulsar Producer] Failed to create producer:', error);
            process.exit(1);
        }
    }
    return producer;
}

async function startConsumer(messageHandler) {
    if (!consumer) {
        const pulsarClient = await getPulsarClient();
        try {
            consumer = await pulsarClient.subscribe({
                topic: 'persistent://public/default/room-states',
                subscription: 'signaling-servers-subscription',
                subscriptionType: 'KeyShared', // 关键!
                ackTimeoutMs: 30000,
                receiverQueueSize: 1000,
            });
            console.log('[Pulsar Consumer] Subscribed successfully with Key_Shared mode.');

            // 持续监听消息
            while (true) {
                const msg = await consumer.receive();
                try {
                    await messageHandler(msg);
                    await consumer.acknowledge(msg);
                } catch (error) {
                    console.error(`[Pulsar Message Handler Error] Failed to process message ${msg.getMessageId()}:`, error);
                    // Nack 会让消息在一段时间后被重新投递
                    consumer.negativeAcknowledge(msg);
                }
            }

        } catch (error) {
            console.error('[Pulsar Consumer] Failed to subscribe:', error);
            process.exit(1);
        }
    }
    return consumer;
}

async function closePulsar() {
    if (producer) await producer.close();
    if (consumer) await consumer.close();
    if (client) await client.close();
    console.log('[Pulsar Client] All connections closed.');
}

module.exports = { getProducer, startConsumer, closePulsar };

信令服务器的发布与消费逻辑:

// signalingServerWithPulsar.js
const { getProducer, startConsumer, closePulsar } = require('./pulsarClient');
const { server } = require('./webSocketServer'); // 假设 WebSocket 服务器已创建

const roomConnections = new Map(); // 同样维护房间与连接的映射

/**
 * 发布状态更新到 Pulsar
 * @param {string} roomId
 * @param {object} stateUpdate
 */
async function publishStateUpdatePulsar(roomId, stateUpdate) {
    try {
        const producer = await getProducer();
        await producer.send({
            data: Buffer.from(JSON.stringify(stateUpdate)),
            partitionKey: roomId, // 使用 roomId 作为 partitionKey/orderingKey 保证顺序
            orderingKey: roomId, // 在Pulsar中,orderingKey 配合 Key_Shared 订阅更佳
        });
    } catch (error) {
        console.error(`[Pulsar Send Error] Failed to publish state for room ${roomId}:`, error);
        // 这里应有更复杂的错误处理,如降级或告警
    }
}

/**
 * Pulsar 消息处理器
 * @param {Pulsar.Message} msg
 */
async function pulsarMessageHandler(msg) {
    const roomId = msg.getOrderingKey();
    const stateUpdate = JSON.parse(msg.getData().toString());

    // 单元测试思路:
    // 1. 模拟收到一条消息,验证 roomConnections 中对应的 ws.send 是否被调用。
    // 2. 模拟 ws.send 抛出异常,验证不会影响后续消息处理和 ack。
    // 3. 模拟一个房间没有连接,验证不会发生错误。

    const connections = roomConnections.get(roomId);
    if (connections && connections.size > 0) {
        const message = JSON.stringify({ type: 'state-update', data: stateUpdate });
        connections.forEach(ws => {
            // 检查连接是否仍然存活
            if (ws.readyState === ws.OPEN) {
                ws.send(message);
            }
        });
    }
}

// 启动服务器
async function main() {
    // 启动 WebSocket 服务器和连接管理逻辑...
    
    // 启动 Pulsar 消费者
    startConsumer(pulsarMessageHandler).catch(err => {
        console.error("Consumer loop failed", err);
        process.exit(1);
    });

    // 优雅停机
    process.on('SIGINT', async () => {
        console.log('Shutting down gracefully...');
        await closePulsar();
        server.close();
        process.exit(0);
    });
}

main();

架构的扩展性与局限性

采用 Pulsar 的架构,其扩展性非常出色。如果我们需要增加一个“房间录制”服务,只需创建一个新的、使用 SharedFailover 订阅类型的消费者组,独立消费 room-states topic 的数据即可,对现有的信令服务集群毫无影响。如果需要做数据分析,同样可以接入 Flink 或 Spark 连接器,消费这个 topic 的数据流。

然而,该架构也存在局限性:

  1. 端到端延迟: 消息路径变为 Client -> Signaling Server -> Pulsar -> Signaling Server -> Other Clients。这个 RTT(往返时间)相比直接在信令服务器内部转发要高。对于 FPS 游戏这类对延迟极其敏感的应用,可能需要更激进的优化,甚至采用 UDP + 自定义状态同步协议。
  2. 信令服务器的连接管理: 虽然状态处理的瓶颈解决了,但单个信令服务器实例能维护的 WebSocket 连接数仍然是有限的。当集群规模变得非常大时,一个房间的参与者可能分布在多个信令服务器实例上。此时,消费到状态更新的那个信令服务器实例,需要一种机制去通知其他实例,让它们转发消息给各自管理的客户端。这通常可以通过 Redis Pub/Sub 或另一个专用的 Pulsar topic 来实现。
  3. 客户端状态管理: 服务器保证了消息的有序投递,但客户端也需要正确地处理这些状态更新。网络抖动可能导致客户端收到的消息乱序。因此,客户端的状态管理逻辑需要具备处理基于序列号或时间戳的乱序消息的能力,保证最终状态的一致性。

最终,没有任何架构是银弹。选择 Pulsar 是基于对未来规模和业务复杂度的前瞻性投资。它将状态管理这个核心问题,从应用层下沉到了一个可靠的、可水平扩展的基础设施层,为上层业务的快速迭代和演进奠定了坚实的基础。


  目录