构建基于 Gatsby 静态外壳与 Axum WebSocket 的高频数据实时渲染架构


项目的需求听起来自相矛盾:我们需要一个仪表盘,它必须拥有静态站点级别的首屏加载速度(LCP 小于 1.5s),并且对 SEO 友好,因为部分视图需要被搜索引擎索引。同时,这个仪表盘的核心功能是实时展示一个高频更新的数据流,频率可能达到每秒上千次的数据点变更。

传统的单页应用(SPA)方案,在首屏性能和 SEO 上有天然劣势,直接被否决。而纯静态站点生成(SSG)方案,如 Gatsby 或 Next.js 的 SSG 模式,虽然能完美解决前者,但其设计初衷并非为了处理大规模的实时数据。一个静态页面如何在加载后“活”过来,变成一个高性能的实时终端?

这就是我们面临的架构挑战。我们的方案选择了一条混合路径:利用 Gatsby 构建一个极致优化的静态“外壳”,一旦页面在客户端完成水合(hydration),立即与后端建立 WebSocket 连接,接管页面的动态部分。后端技术栈我们选择了 Rust 和 Axum,以确保能从容应对海量的并发连接和高吞吐量的数据推送。前端的状态管理则交给了 MobX,利用其精细化的响应机制来消化数据洪流,避免 UI 崩溃。

第一步:设计稳固的 Axum WebSocket 后端

一切的起点是后端。如果后端无法稳定、低延迟地推送数据,前端再怎么优化也是无源之水。选择 Rust 和 Axum 的理由很明确:内存安全、无 GC 停顿以及基于 Tokio 的卓越异步性能。这对于一个长连接、高并发的服务至关重要。

首先是项目结构和依赖配置。

Cargo.toml:

[package]
name = "realtime-axum-server"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = { version = "0.6", features = ["ws"] }
tokio = { version = "1.0", features = ["full"] }
futures-util = { version = "0.3", default-features = false, features = ["sink", "stream"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.4", features = ["v4", "serde"] }
dashmap = "5.5"
once_cell = "1.18"
rand = "0.8"

我们的核心是维护一个所有已连接客户端的共享状态。在真实项目中,直接使用 Mutex<HashMap<...>> 会在广播时导致锁争用,成为性能瓶颈。DashMap 提供了一个分片锁的并发 HashMap,是这类场景的理想选择。

我们定义一个 AppState 来持有这个客户端列表。

src/main.rs:

use axum::{
    extract::{
        ws::{Message, WebSocket, WebSocketUpgrade},
        State,
    },
    response::IntoResponse,
    routing::get,
    Router,
};
use dashmap::DashMap;
use futures_util::{stream::StreamExt, SinkExt};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::broadcast;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};

// 使用 once_cell 来创建一个全局的 shutdown channel
static SHUTDOWN_TX: Lazy<broadcast::Sender<()>> = Lazy::new(|| {
    let (tx, _) = broadcast::channel(1);
    tx
});

#[derive(Debug, Serialize, Clone)]
struct DataPoint {
    id: String,
    value: f64,
    timestamp: u64,
}

// 客户端状态,这里我们只关心发送通道
struct Client {
    tx: tokio::sync::mpsc::UnboundedSender<Message>,
}

// 共享的应用状态
type AppState = Arc<DashMap<Uuid, Client>>;

#[tokio::main]
async fn main() {
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "realtime_axum_server=debug,tower_http=debug".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();

    let clients: AppState = Arc::new(DashMap::new());

    // 启动数据生成器
    tokio::spawn(data_generator(clients.clone()));

    let app = Router::new()
        .route("/ws", get(ws_handler))
        .with_state(clients);

    let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
    tracing::debug!("listening on {}", addr);

    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .with_graceful_shutdown(async {
            // 等待全局的 shutdown 信号
            let mut rx = SHUTDOWN_TX.subscribe();
            let _ = rx.recv().await;
            tracing::info!("Graceful shutdown initiated.");
        })
        .await
        .unwrap();
}

async fn ws_handler(
    ws: WebSocketUpgrade,
    State(state): State<AppState>,
) -> impl IntoResponse {
    ws.on_upgrade(|socket| handle_socket(socket, state))
}

async fn handle_socket(socket: WebSocket, state: AppState) {
    let client_id = Uuid::new_v4();
    let (mut socket_sender, mut socket_receiver) = socket.split();

    // 为每个客户端创建一个 MPSC channel,这是关键
    // 避免了在广播时直接操作 socket,降低锁竞争
    let (client_tx, mut client_rx) = tokio::sync::mpsc::unbounded_channel();

    state.insert(client_id, Client { tx: client_tx });
    tracing::info!("New client connected: {}", client_id);

    // 这个 tokio task 负责将 MPSC channel 中的消息写入真实的 WebSocket
    tokio::spawn(async move {
        while let Some(msg) = client_rx.recv().await {
            if socket_sender.send(msg).await.is_err() {
                // 写入失败,说明客户端已断开
                break;
            }
        }
    });

    // 这个循环负责处理从客户端接收到的消息
    while let Some(Ok(msg)) = socket_receiver.next().await {
        if let Message::Close(_) = msg {
            break;
        }
        // 在此可以处理客户端发来的消息,例如心跳或请求
    }

    // 清理工作
    state.remove(&client_id);
    tracing::info!("Client disconnected: {}", client_id);
}

// 数据生成器模拟高频数据源
async fn data_generator(clients: AppState) {
    let mut interval = tokio::time::interval(Duration::from_millis(100)); // 每 100ms 生成一批数据
    loop {
        interval.tick().await;

        let data_points: Vec<DataPoint> = (0..50) // 每次生成50个数据点
            .map(|_| {
                let id = format!("item_{}", (rand::random::<u8>() % 100)); // 模拟100个不同的数据源
                DataPoint {
                    id,
                    value: rand::random::<f64>() * 1000.0,
                    timestamp: std::time::SystemTime::now()
                        .duration_since(std::time::UNIX_EPOCH)
                        .unwrap()
                        .as_secs(),
                }
            })
            .collect();

        let payload = serde_json::to_string(&data_points).unwrap();
        let message = Message::Text(payload);
        
        // 遍历所有客户端并发送消息
        // DashMap 的迭代器是安全的
        clients.iter().for_each(|entry| {
            let client = entry.value();
            // 使用 mpsc channel 发送,是非阻塞的,而且不会因为某个慢客户端阻塞整个广播
            if let Err(e) = client.tx.send(message.clone()) {
                tracing::warn!("Failed to send message to client: {}", e);
            }
        });
    }
}

这里的核心设计有几点:

  1. 并发安全的客户端管理: 使用 DashMap 管理客户端,它在并发读写场景下性能远超 Mutex<HashMap>
  2. 解耦的收发逻辑: handle_socket 函数中,我们将 WebSocket 分割成 senderreceiver。更重要的是,我们为每个客户端创建了一个 mpsc (multi-producer, single-consumer) channel。广播时,data_generator 只是将消息推送到这些 channel 中,这个操作极快且无锁。每个客户端有一个独立的 tokio::spawn 任务负责从自己的 channel 中取出消息并写入 socket。这避免了因为某个客户端网络缓慢而阻塞整个广播循环。
  3. 独立的任务: 数据生成是一个独立的 tokio 任务,与处理连接的主流程分离,符合 actor 模式的思想。
  4. 优雅停机: 通过一个全局的 broadcast::channel 来触发 with_graceful_shutdown,这在生产环境中是必须的,确保服务重启时能正确关闭现有连接。

第二步:Gatsby 静态外壳的搭建

Gatsby 部分相对直接。我们创建一个标准的 Gatsby 项目,并定义一个页面用于承载我们的实时仪表盘。

src/pages/dashboard.js:

import React, { useEffect, useState } from 'react';
import { observer } from 'mobx-react-lite';
import dataStore from '../stores/DataStore';
import DataGrid from '../components/DataGrid';

const DashboardPage = () => {
    const [connectionStatus, setConnectionStatus] = useState('Connecting...');

    useEffect(() => {
        // 确保只在浏览器环境中执行
        if (typeof window !== 'undefined') {
            const connect = () => {
                const ws = new WebSocket('ws://127.0.0.1:3001/ws');

                ws.onopen = () => {
                    console.log('WebSocket connection established.');
                    setConnectionStatus('Connected');
                };

                ws.onmessage = (event) => {
                    try {
                        const data = JSON.parse(event.data);
                        // 这是关键:将数据处理委托给 MobX store
                        dataStore.updateDataPoints(data);
                    } catch (error) {
                        console.error('Error parsing WebSocket message:', error);
                    }
                };

                ws.onclose = () => {
                    console.log('WebSocket connection closed. Reconnecting...');
                    setConnectionStatus('Reconnecting...');
                    // 实现一个简单的重连机制
                    setTimeout(connect, 3000); 
                };

                ws.onerror = (error) => {
                    console.error('WebSocket error:', error);
                    ws.close();
                };
            };

            connect();
        }

        // 组件卸载时不需要清理,因为我们希望连接保持
        // 除非应用逻辑需要
    }, []);

    return (
        <main>
            <title>Real-time Dashboard</title>
            <h1>High-Frequency Data Stream</h1>
            <p>Connection Status: {connectionStatus}</p>
            <p>Total Items: {dataStore.itemCount}</p>
            {/* DataGrid 组件将负责渲染 */}
            <DataGrid />
        </main>
    );
};

// 使用 observer 包裹页面组件
export default observer(DashboardPage);

这段代码的核心在于 useEffect 钩子。它负责在客户端(typeof window !== 'undefined')建立 WebSocket 连接。当收到消息时,它不做任何复杂的 state 操作,而是直接调用 dataStore.updateDataPoints。这种职责分离是保持组件简洁和高性能的关键。

第三’步:用 MobX 驯服数据洪流

这是整个前端架构的灵魂。如果直接使用 React 的 useState 来处理每秒几百甚至上千次的更新,整个应用会因为根组件的反复渲染而卡死。我们需要一个能实现精细化、原子级更新的方案。

MobX 的工作原理正合此意。它通过追踪数据在何处被使用,来构建一个精确的依赖图。当一个 observable 数据变化时,只有依赖了这个特定数据的 observer 组件会重新渲染。

src/stores/DataStore.js:

import { makeAutoObservable, runInAction, observable } from 'mobx';

class DataStore {
    // 使用 observable.map 是性能关键
    // 它对键的增删改查操作是 O(1),且 MobX 对其做了深度优化
    dataPoints = observable.map();

    constructor() {
        makeAutoObservable(this);
    }

    // 计算属性会被自动缓存,只有依赖项变化时才重新计算
    get itemCount() {
        return this.dataPoints.size;
    }

    // 这是一个 action,所有对 state 的修改都应在 action 中进行
    updateDataPoints(points) {
        // runInAction 会将多次 state 修改合并为一次“突变”
        // 从而只触发一次UI更新,这是至关重要的性能优化
        runInAction(() => {
            if (!Array.isArray(points)) {
                console.warn("Received non-array data:", points);
                return;
            }
            points.forEach(point => {
                // 如果已存在,则更新;否则,新增
                // MobX 会精确地通知观察者
                const existing = this.dataPoints.get(point.id);
                if (existing) {
                    // 只更新变化的字段,避免不必要的对象重构
                    existing.value = point.value;
                    existing.timestamp = point.timestamp;
                } else {
                    this.dataPoints.set(point.id, {
                        id: point.id,
                        value: point.value,
                        timestamp: point.timestamp,
                    });
                }
            });
        });
    }
}

const dataStore = new DataStore();
export default dataStore;

MobX store 的设计要点:

  1. observable.map(): 对于一个由 ID 索引的集合,observable.map 远比数组 [] 高效。当单个条目更新时,MobX 知道只需通知观察这个特定条目的组件,而不是整个列表。
  2. runInAction: 后端一次性推送了50个数据点的更新。如果在 forEach 循环中每次 set 或修改都触发一次UI更新,那将是一场灾难。runInAction 将这50次修改打包成一个原子操作,在所有修改完成后,才通知观察者进行一次渲染。这是应对批量更新的“银弹”。
  3. get itemCount(): computed 属性(通过 get 关键字定义)是 MobX 的另一个法宝。它会被自动缓存,只有当 this.dataPoints.size 发生变化时,才会重新计算。

最后,是渲染组件的设计。我们需要将渲染压力分散到最小的单元。

src/components/DataGrid.js:

import React from 'react';
import { observer } from 'mobx-react-lite';
import dataStore from '../stores/DataStore';
import { useMemo } from 'react';

// 单个数据行的组件
const DataRow = observer(({ dataPoint }) => {
    // 这个组件非常小,只依赖于单个 dataPoint 对象
    // 当 dataPoint.value 或 dataPoint.timestamp 变化时,只有这个组件会重渲染
    // 而不是整个 DataGrid
    return (
        <div style={{ display: 'flex', justifyContent: 'space-between', padding: '2px 5px' }}>
            <span>{dataPoint.id}</span>
            <span style={{ color: dataPoint.value > 500 ? 'green' : 'red' }}>
                {dataPoint.value.toFixed(2)}
            </span>
            <span>{new Date(dataPoint.timestamp * 1000).toLocaleTimeString()}</span>
        </div>
    );
});

const DataGrid = observer(() => {
    // 这里的 toJS(dataStore.dataPoints) 或 [...dataStore.dataPoints.values()]
    // 是一个常见的反模式,因为它会在每次任何数据点变化时,
    // 创建一个新数组,导致整个列表重新渲染。

    // 正确的做法是直接迭代 observable map 的 key
    const sortedKeys = useMemo(() => {
        return Array.from(dataStore.dataPoints.keys()).sort();
    }, [dataStore.itemCount]); // 只在条目数量变化时重新排序

    return (
        <div style={{ border: '1px solid #ccc', fontFamily: 'monospace', height: '400px', overflowY: 'auto' }}>
            {sortedKeys.map(key => {
                // 通过 key 从 map 中获取 observable 对象
                // 并将其传递给子组件
                const dataPoint = dataStore.dataPoints.get(key);
                // React 的 key 属性必须是唯一的,这里的 id 正好
                return <DataRow key={key} dataPoint={dataPoint} />;
            })}
        </div>
    );
});

export default DataGrid;

DataGrid 组件的实现体现了 MobX 的最佳实践:

  • 父组件只负责结构: DataGrid 组件自身只依赖 dataStore.dataPoints.keys()。它负责遍历,但不关心每个数据项的具体值。
  • 子组件负责渲染: DataRow 组件被 observer 包裹,并且只接收一个 dataPoint 对象。当 dataStore 更新了 item_10 的值,只有那个 keyitem_10DataRow 实例会重新渲染。页面上的其他99行纹丝不动。这就是性能的来源。
  • 避免反模式: 我们没有将 dataPoints 转换为普通数组再传给 map。我们直接迭代 keys,然后通过 get(key) 获取原始的 observable 对象,维持了 MobX 响应式链条的完整性。

最终的架构图如下:

graph TD
    subgraph Browser
        A[Gatsby Build] -- static HTML/JS --> B(Client Loads Static Shell);
        B -- Hydration --> C{React App Takes Over};
        C -- useEffect --> D[Establish WebSocket Conn];
        D -- onmessage --> E[MobX Store Action: updateDataPoints];
        E -- runInAction --> F[Observable Map Update];
        F -- Fine-grained Notification --> G((DataRow Component Render));
        H[DataGrid Component] --> G;
    end
    subgraph Server
        I[Axum/Tokio Runtime] --> J{WebSocket Handler};
        K[Data Generator Task] -- High-frequency data --> L[Broadcast to Clients via MPSC];
        L --> J;
    end
    D <--> J;

局限与未来路径

这个架构虽然解决了最初的矛盾,但在生产环境中仍有需要完善的地方。

首先,前端的渲染性能瓶颈并未完全消失。当数据点增长到数千个时,即使每个 DataRow 的重渲染很快,DOM 中存在数千个节点本身就会带来性能压力。此时需要引入虚拟列表(Virtual List)技术,只渲染视口内可见的行。

其次,后端的数据生成是单点。在真实系统中,这可能是一个复杂的分布式流处理系统(如 Kafka -> Flink)。Axum 服务需要考虑如何优雅地处理上游的背压(backpressure)问题,以及自身的水平扩展。

最后,客户端的重连逻辑目前非常简单。一个生产级的实现需要更复杂的指数退避策略,并且在重连后需要一种机制(如通过 API 拉取快照)来弥补断连期间丢失的数据,保证状态的最终一致性。当前的方案在断连期间会丢失更新。


  目录