项目的需求听起来自相矛盾:我们需要一个仪表盘,它必须拥有静态站点级别的首屏加载速度(LCP 小于 1.5s),并且对 SEO 友好,因为部分视图需要被搜索引擎索引。同时,这个仪表盘的核心功能是实时展示一个高频更新的数据流,频率可能达到每秒上千次的数据点变更。
传统的单页应用(SPA)方案,在首屏性能和 SEO 上有天然劣势,直接被否决。而纯静态站点生成(SSG)方案,如 Gatsby 或 Next.js 的 SSG 模式,虽然能完美解决前者,但其设计初衷并非为了处理大规模的实时数据。一个静态页面如何在加载后“活”过来,变成一个高性能的实时终端?
这就是我们面临的架构挑战。我们的方案选择了一条混合路径:利用 Gatsby 构建一个极致优化的静态“外壳”,一旦页面在客户端完成水合(hydration),立即与后端建立 WebSocket 连接,接管页面的动态部分。后端技术栈我们选择了 Rust 和 Axum,以确保能从容应对海量的并发连接和高吞吐量的数据推送。前端的状态管理则交给了 MobX,利用其精细化的响应机制来消化数据洪流,避免 UI 崩溃。
第一步:设计稳固的 Axum WebSocket 后端
一切的起点是后端。如果后端无法稳定、低延迟地推送数据,前端再怎么优化也是无源之水。选择 Rust 和 Axum 的理由很明确:内存安全、无 GC 停顿以及基于 Tokio 的卓越异步性能。这对于一个长连接、高并发的服务至关重要。
首先是项目结构和依赖配置。
Cargo.toml:
[package]
name = "realtime-axum-server"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = { version = "0.6", features = ["ws"] }
tokio = { version = "1.0", features = ["full"] }
futures-util = { version = "0.3", default-features = false, features = ["sink", "stream"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.4", features = ["v4", "serde"] }
dashmap = "5.5"
once_cell = "1.18"
rand = "0.8"
我们的核心是维护一个所有已连接客户端的共享状态。在真实项目中,直接使用 Mutex<HashMap<...>> 会在广播时导致锁争用,成为性能瓶颈。DashMap 提供了一个分片锁的并发 HashMap,是这类场景的理想选择。
我们定义一个 AppState 来持有这个客户端列表。
src/main.rs:
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
routing::get,
Router,
};
use dashmap::DashMap;
use futures_util::{stream::StreamExt, SinkExt};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::broadcast;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
// 使用 once_cell 来创建一个全局的 shutdown channel
static SHUTDOWN_TX: Lazy<broadcast::Sender<()>> = Lazy::new(|| {
let (tx, _) = broadcast::channel(1);
tx
});
#[derive(Debug, Serialize, Clone)]
struct DataPoint {
id: String,
value: f64,
timestamp: u64,
}
// 客户端状态,这里我们只关心发送通道
struct Client {
tx: tokio::sync::mpsc::UnboundedSender<Message>,
}
// 共享的应用状态
type AppState = Arc<DashMap<Uuid, Client>>;
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "realtime_axum_server=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let clients: AppState = Arc::new(DashMap::new());
// 启动数据生成器
tokio::spawn(data_generator(clients.clone()));
let app = Router::new()
.route("/ws", get(ws_handler))
.with_state(clients);
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async {
// 等待全局的 shutdown 信号
let mut rx = SHUTDOWN_TX.subscribe();
let _ = rx.recv().await;
tracing::info!("Graceful shutdown initiated.");
})
.await
.unwrap();
}
async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, state))
}
async fn handle_socket(socket: WebSocket, state: AppState) {
let client_id = Uuid::new_v4();
let (mut socket_sender, mut socket_receiver) = socket.split();
// 为每个客户端创建一个 MPSC channel,这是关键
// 避免了在广播时直接操作 socket,降低锁竞争
let (client_tx, mut client_rx) = tokio::sync::mpsc::unbounded_channel();
state.insert(client_id, Client { tx: client_tx });
tracing::info!("New client connected: {}", client_id);
// 这个 tokio task 负责将 MPSC channel 中的消息写入真实的 WebSocket
tokio::spawn(async move {
while let Some(msg) = client_rx.recv().await {
if socket_sender.send(msg).await.is_err() {
// 写入失败,说明客户端已断开
break;
}
}
});
// 这个循环负责处理从客户端接收到的消息
while let Some(Ok(msg)) = socket_receiver.next().await {
if let Message::Close(_) = msg {
break;
}
// 在此可以处理客户端发来的消息,例如心跳或请求
}
// 清理工作
state.remove(&client_id);
tracing::info!("Client disconnected: {}", client_id);
}
// 数据生成器模拟高频数据源
async fn data_generator(clients: AppState) {
let mut interval = tokio::time::interval(Duration::from_millis(100)); // 每 100ms 生成一批数据
loop {
interval.tick().await;
let data_points: Vec<DataPoint> = (0..50) // 每次生成50个数据点
.map(|_| {
let id = format!("item_{}", (rand::random::<u8>() % 100)); // 模拟100个不同的数据源
DataPoint {
id,
value: rand::random::<f64>() * 1000.0,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
})
.collect();
let payload = serde_json::to_string(&data_points).unwrap();
let message = Message::Text(payload);
// 遍历所有客户端并发送消息
// DashMap 的迭代器是安全的
clients.iter().for_each(|entry| {
let client = entry.value();
// 使用 mpsc channel 发送,是非阻塞的,而且不会因为某个慢客户端阻塞整个广播
if let Err(e) = client.tx.send(message.clone()) {
tracing::warn!("Failed to send message to client: {}", e);
}
});
}
}
这里的核心设计有几点:
- 并发安全的客户端管理: 使用
DashMap管理客户端,它在并发读写场景下性能远超Mutex<HashMap>。 - 解耦的收发逻辑:
handle_socket函数中,我们将 WebSocket 分割成sender和receiver。更重要的是,我们为每个客户端创建了一个mpsc(multi-producer, single-consumer) channel。广播时,data_generator只是将消息推送到这些 channel 中,这个操作极快且无锁。每个客户端有一个独立的tokio::spawn任务负责从自己的 channel 中取出消息并写入 socket。这避免了因为某个客户端网络缓慢而阻塞整个广播循环。 - 独立的任务: 数据生成是一个独立的
tokio任务,与处理连接的主流程分离,符合 actor 模式的思想。 - 优雅停机: 通过一个全局的
broadcast::channel来触发with_graceful_shutdown,这在生产环境中是必须的,确保服务重启时能正确关闭现有连接。
第二步:Gatsby 静态外壳的搭建
Gatsby 部分相对直接。我们创建一个标准的 Gatsby 项目,并定义一个页面用于承载我们的实时仪表盘。
src/pages/dashboard.js:
import React, { useEffect, useState } from 'react';
import { observer } from 'mobx-react-lite';
import dataStore from '../stores/DataStore';
import DataGrid from '../components/DataGrid';
const DashboardPage = () => {
const [connectionStatus, setConnectionStatus] = useState('Connecting...');
useEffect(() => {
// 确保只在浏览器环境中执行
if (typeof window !== 'undefined') {
const connect = () => {
const ws = new WebSocket('ws://127.0.0.1:3001/ws');
ws.onopen = () => {
console.log('WebSocket connection established.');
setConnectionStatus('Connected');
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
// 这是关键:将数据处理委托给 MobX store
dataStore.updateDataPoints(data);
} catch (error) {
console.error('Error parsing WebSocket message:', error);
}
};
ws.onclose = () => {
console.log('WebSocket connection closed. Reconnecting...');
setConnectionStatus('Reconnecting...');
// 实现一个简单的重连机制
setTimeout(connect, 3000);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
ws.close();
};
};
connect();
}
// 组件卸载时不需要清理,因为我们希望连接保持
// 除非应用逻辑需要
}, []);
return (
<main>
<title>Real-time Dashboard</title>
<h1>High-Frequency Data Stream</h1>
<p>Connection Status: {connectionStatus}</p>
<p>Total Items: {dataStore.itemCount}</p>
{/* DataGrid 组件将负责渲染 */}
<DataGrid />
</main>
);
};
// 使用 observer 包裹页面组件
export default observer(DashboardPage);
这段代码的核心在于 useEffect 钩子。它负责在客户端(typeof window !== 'undefined')建立 WebSocket 连接。当收到消息时,它不做任何复杂的 state 操作,而是直接调用 dataStore.updateDataPoints。这种职责分离是保持组件简洁和高性能的关键。
第三’步:用 MobX 驯服数据洪流
这是整个前端架构的灵魂。如果直接使用 React 的 useState 来处理每秒几百甚至上千次的更新,整个应用会因为根组件的反复渲染而卡死。我们需要一个能实现精细化、原子级更新的方案。
MobX 的工作原理正合此意。它通过追踪数据在何处被使用,来构建一个精确的依赖图。当一个 observable 数据变化时,只有依赖了这个特定数据的 observer 组件会重新渲染。
src/stores/DataStore.js:
import { makeAutoObservable, runInAction, observable } from 'mobx';
class DataStore {
// 使用 observable.map 是性能关键
// 它对键的增删改查操作是 O(1),且 MobX 对其做了深度优化
dataPoints = observable.map();
constructor() {
makeAutoObservable(this);
}
// 计算属性会被自动缓存,只有依赖项变化时才重新计算
get itemCount() {
return this.dataPoints.size;
}
// 这是一个 action,所有对 state 的修改都应在 action 中进行
updateDataPoints(points) {
// runInAction 会将多次 state 修改合并为一次“突变”
// 从而只触发一次UI更新,这是至关重要的性能优化
runInAction(() => {
if (!Array.isArray(points)) {
console.warn("Received non-array data:", points);
return;
}
points.forEach(point => {
// 如果已存在,则更新;否则,新增
// MobX 会精确地通知观察者
const existing = this.dataPoints.get(point.id);
if (existing) {
// 只更新变化的字段,避免不必要的对象重构
existing.value = point.value;
existing.timestamp = point.timestamp;
} else {
this.dataPoints.set(point.id, {
id: point.id,
value: point.value,
timestamp: point.timestamp,
});
}
});
});
}
}
const dataStore = new DataStore();
export default dataStore;
MobX store 的设计要点:
-
observable.map(): 对于一个由 ID 索引的集合,observable.map远比数组[]高效。当单个条目更新时,MobX 知道只需通知观察这个特定条目的组件,而不是整个列表。 -
runInAction: 后端一次性推送了50个数据点的更新。如果在forEach循环中每次set或修改都触发一次UI更新,那将是一场灾难。runInAction将这50次修改打包成一个原子操作,在所有修改完成后,才通知观察者进行一次渲染。这是应对批量更新的“银弹”。 -
get itemCount():computed属性(通过get关键字定义)是 MobX 的另一个法宝。它会被自动缓存,只有当this.dataPoints.size发生变化时,才会重新计算。
最后,是渲染组件的设计。我们需要将渲染压力分散到最小的单元。
src/components/DataGrid.js:
import React from 'react';
import { observer } from 'mobx-react-lite';
import dataStore from '../stores/DataStore';
import { useMemo } from 'react';
// 单个数据行的组件
const DataRow = observer(({ dataPoint }) => {
// 这个组件非常小,只依赖于单个 dataPoint 对象
// 当 dataPoint.value 或 dataPoint.timestamp 变化时,只有这个组件会重渲染
// 而不是整个 DataGrid
return (
<div style={{ display: 'flex', justifyContent: 'space-between', padding: '2px 5px' }}>
<span>{dataPoint.id}</span>
<span style={{ color: dataPoint.value > 500 ? 'green' : 'red' }}>
{dataPoint.value.toFixed(2)}
</span>
<span>{new Date(dataPoint.timestamp * 1000).toLocaleTimeString()}</span>
</div>
);
});
const DataGrid = observer(() => {
// 这里的 toJS(dataStore.dataPoints) 或 [...dataStore.dataPoints.values()]
// 是一个常见的反模式,因为它会在每次任何数据点变化时,
// 创建一个新数组,导致整个列表重新渲染。
// 正确的做法是直接迭代 observable map 的 key
const sortedKeys = useMemo(() => {
return Array.from(dataStore.dataPoints.keys()).sort();
}, [dataStore.itemCount]); // 只在条目数量变化时重新排序
return (
<div style={{ border: '1px solid #ccc', fontFamily: 'monospace', height: '400px', overflowY: 'auto' }}>
{sortedKeys.map(key => {
// 通过 key 从 map 中获取 observable 对象
// 并将其传递给子组件
const dataPoint = dataStore.dataPoints.get(key);
// React 的 key 属性必须是唯一的,这里的 id 正好
return <DataRow key={key} dataPoint={dataPoint} />;
})}
</div>
);
});
export default DataGrid;
DataGrid 组件的实现体现了 MobX 的最佳实践:
- 父组件只负责结构:
DataGrid组件自身只依赖dataStore.dataPoints.keys()。它负责遍历,但不关心每个数据项的具体值。 - 子组件负责渲染:
DataRow组件被observer包裹,并且只接收一个dataPoint对象。当dataStore更新了item_10的值,只有那个key为item_10的DataRow实例会重新渲染。页面上的其他99行纹丝不动。这就是性能的来源。 - 避免反模式: 我们没有将
dataPoints转换为普通数组再传给map。我们直接迭代keys,然后通过get(key)获取原始的observable对象,维持了 MobX 响应式链条的完整性。
最终的架构图如下:
graph TD
subgraph Browser
A[Gatsby Build] -- static HTML/JS --> B(Client Loads Static Shell);
B -- Hydration --> C{React App Takes Over};
C -- useEffect --> D[Establish WebSocket Conn];
D -- onmessage --> E[MobX Store Action: updateDataPoints];
E -- runInAction --> F[Observable Map Update];
F -- Fine-grained Notification --> G((DataRow Component Render));
H[DataGrid Component] --> G;
end
subgraph Server
I[Axum/Tokio Runtime] --> J{WebSocket Handler};
K[Data Generator Task] -- High-frequency data --> L[Broadcast to Clients via MPSC];
L --> J;
end
D <--> J;
局限与未来路径
这个架构虽然解决了最初的矛盾,但在生产环境中仍有需要完善的地方。
首先,前端的渲染性能瓶颈并未完全消失。当数据点增长到数千个时,即使每个 DataRow 的重渲染很快,DOM 中存在数千个节点本身就会带来性能压力。此时需要引入虚拟列表(Virtual List)技术,只渲染视口内可见的行。
其次,后端的数据生成是单点。在真实系统中,这可能是一个复杂的分布式流处理系统(如 Kafka -> Flink)。Axum 服务需要考虑如何优雅地处理上游的背压(backpressure)问题,以及自身的水平扩展。
最后,客户端的重连逻辑目前非常简单。一个生产级的实现需要更复杂的指数退避策略,并且在重连后需要一种机制(如通过 API 拉取快照)来弥补断连期间丢失的数据,保证状态的最终一致性。当前的方案在断连期间会丢失更新。