一个看似简单的业务需求,往往是检验架构选型是否合理的试金石。例如,允许用户上传个人简历(CV),系统在后台进行深度解析(如技能提取、格式转换、AI评估),这个过程可能耗时30秒到数分钟。用户提交后,我们期望页面能立即响应,并最终在一个高性能的静态页面上展示解析结果。
如果整个系统构建在Vercel这样的Serverless平台上,挑战会变得更加尖锐。Vercel Functions的执行时间上限(Pro套餐为60秒)直接排除了在单个同步请求中完成整个处理流程的可能性。强行执行不仅会导致请求超时,还会造成极差的用户体验。这是一个典型的命令(Command)与查询(Query)负载不匹配的场景,也是引入CQRS(Command Query Responsibility Segregation)架构的绝佳切入点。
定义问题:同步模型的困境
我们首先审视一个直接但不可行的方案:单一Vercel Function同步处理。
// /api/cv/upload-sync.ts
// 这是一个错误示范,用于说明问题
import { NextApiRequest, NextApiResponse } from 'next';
import { formidable } from 'formidable';
import { parseCV, analyzeSkills, generateReport } from '@/lib/cv-processor'; // 耗时操作
export const config = {
api: {
bodyParser: false,
},
};
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
if (req.method !== 'POST') {
return res.status(405).json({ error: 'Method Not Allowed' });
}
const form = formidable({});
const [, files] = await form.parse(req);
const cvFile = files.cv?.[0];
if (!cvFile) {
return res.status(400).json({ error: 'CV file is required.' });
}
try {
// 假设这些函数合计执行时间超过60秒
const parsedData = await parseCV(cvFile.filepath);
const skills = await analyzeSkills(parsedData);
const report = await generateReport(skills);
// 将结果存入数据库
// await db.saveReport(report);
return res.status(200).json({ report });
} catch (error) {
console.error('CV processing failed:', error);
return res.status(500).json({ error: 'Internal Server Error during processing.' });
}
}
这个模型的致命缺陷在于其同步阻塞的特性:
- 超时风险: 任何超过Vercel平台限制时长的处理都会导致请求失败,返回
504 Gateway Timeout
。 - 资源浪费: 一个HTTP处理线程被长时间占用,等待CPU密集型或IO密集型任务完成。这在Serverless按需计费模型下,成本效益极低。
- 缺乏韧性: 如果在处理中途发生瞬时错误(如第三方API抖动),整个任务失败,且没有内置的重试机制。用户必须重新上传。
- 糟糕的体验: 用户在浏览器前枯等,直到服务器返回结果或超时。
架构决策:基于SQS和ISR的异步CQRS方案
问题的本质是需要将写入操作(命令)与读取操作(查询)彻底解耦。
- 命令(Command): “提交一份CV进行处理”的意图。它应该被快速接收、验证并排队,但不立即执行。
- 查询(Query): “查看CV的处理结果”的请求。它应该访问一个为快速读取而优化的数据视图,该视图由后台处理任务最终更新。
基于此,我们选择的架构如下:
- 命令入口 (Vercel Edge Function): 一个轻量级的API端点接收CV上传。它的唯一职责是参数校验,生成一个唯一的任务ID,然后将任务元数据推送至AWS SQS (Simple Queue Service) 队列。完成后,它立即向客户端返回
202 Accepted
状态码和任务ID。整个过程耗时毫秒级。 - 命令队列 (AWS SQS): SQS作为命令总线和缓冲区。它保证了命令的持久化,提供了至少一次的交付语义,并支持死信队列(DLQ)来处理无法被消费的毒丸消息。这是系统韧性的核心。
- 命令处理器 (Vercel Cron-Triggered Function): 一个独立的、由定时任务(例如每分钟)触发的Vercel Function,作为SQS队列的消费者。它从队列中拉取消息,执行耗时的CV处理逻辑。这个Function不直接面向用户,因此它的执行时长不影响用户体验。
- 读模型存储 (Vercel KV): 处理完成后,结果被存储在一个为快速键值查询优化的数据库中,例如Vercel KV或Upstash Redis。
- 视图更新与查询 (Next.js with ISR): 用户的CV结果展示页面使用Next.js的增量静态再生(Incremental Static Regeneration, ISR)构建。命令处理器在完成任务后,会调用一个特定的Vercel API(Revalidation API)来触发对应页面的后台重新生成。用户访问时,始终能从Vercel的边缘网络中获取到高度缓存的静态HTML,实现了极致的读取性能。
下面是这个流程的图示:
sequenceDiagram participant Client participant Vercel Edge API as /api/cv/submit participant AWS SQS participant Vercel Cron Job as /api/cv/process participant Vercel KV as Read Model participant Next.js Page as /cv/[id] Client->>+Vercel Edge API: POST /api/cv/submit (CV file) Vercel Edge API->>Vercel Edge API: 1. Validate input & generate jobId Vercel Edge API->>+AWS SQS: 2. SendMessage(jobId, cv_location) AWS SQS-->>-Vercel Edge API: MessageId Vercel Edge API-->>-Client: 3. HTTP 202 Accepted { jobId } Note over Vercel Cron Job: Triggered periodically (e.g., every minute) loop Batch Processing Vercel Cron Job->>+AWS SQS: 4. ReceiveMessages() AWS SQS-->>-Vercel Cron Job: Messages (jobs) Vercel Cron Job->>Vercel Cron Job: 5. For each message: process CV (long task) Vercel Cron Job->>+Vercel KV: 6. Save processed result against jobId Vercel KV-->>-Vercel Cron Job: OK Vercel Cron Job->>+Next.js Page: 7. POST /api/revalidate (path: /cv/[jobId]) Next.js Page-->>-Vercel Cron Job: Revalidation triggered Vercel Cron Job->>+AWS SQS: 8. DeleteMessage() AWS SQS-->>-Vercel Cron Job: OK end Client->>+Next.js Page: GET /cv/[id] Note over Next.js Page: Serves stale static page from Edge, triggers revalidation if needed Next.js Page-->>Client: Cached HTML (or "Processing" state) Note over Next.js Page: After revalidation, new static page is available at the Edge Client->>+Next.js Page: Subsequent GET /cv/[id] Next.js Page-->>Client: Freshly generated HTML with CV results
核心实现概览
1. 配置与环境变量
在真实项目中,所有敏感信息和配置都应通过环境变量管理。在Vercel项目中,这可以在项目设置中完成。
# .env.local
# AWS Credentials (ensure Vercel has IAM permissions for SQS)
AWS_ACCESS_KEY_ID="your_aws_access_key"
AWS_SECRET_ACCESS_KEY="your_aws_secret_key"
AWS_REGION="us-east-1"
SQS_QUEUE_URL="your_sqs_queue_url"
# Vercel KV (automatically configured when integrated)
KV_URL="..."
KV_REST_API_URL="..."
KV_REST_API_TOKEN="..."
KV_REST_API_READ_ONLY_TOKEN="..."
# Revalidation Secret
REVALIDATE_SECRET="a_very_strong_random_string"
2. 命令入口: /pages/api/cv/submit.ts
这个API端点需要做到极致的轻量和快速。我们使用Vercel Blob来存储上传的文件,然后将文件的URL推送到SQS。
// /pages/api/cv/submit.ts
import { type NextRequest, NextResponse } from 'next/server';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { put as vercelBlobPut } from '@vercel/blob';
import { nanoid } from 'nanoid';
// Vercel Edge Runtime is preferred for this endpoint for low latency
export const runtime = 'edge';
const sqsClient = new SQSClient({
region: process.env.AWS_REGION!,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
},
});
export default async function handler(req: NextRequest) {
if (req.method !== 'POST') {
return new NextResponse('Method Not Allowed', { status: 405 });
}
const formData = await req.formData();
const file = formData.get('cv') as File | null;
if (!file || file.size === 0) {
return NextResponse.json({ error: 'CV file is required.' }, { status: 400 });
}
// A common mistake is to process the file here.
// Instead, we offload it immediately.
const jobId = nanoid();
const pathname = `cvs/${jobId}-${file.name}`;
try {
// 1. Upload file to a persistent storage like Vercel Blob or S3
const blob = await vercelBlobPut(pathname, file, {
access: 'public',
});
// 2. Send a message to SQS with job details
const command = new SendMessageCommand({
QueueUrl: process.env.SQS_QUEUE_URL!,
MessageBody: JSON.stringify({
jobId,
fileUrl: blob.url,
fileName: file.name,
submittedAt: new Date().toISOString(),
}),
// Using MessageGroupId for FIFO queues if needed, but not essential for this use case
MessageGroupId: 'cv-processing',
MessageDeduplicationId: jobId, // Prevents duplicate messages for 5 mins
});
await sqsClient.send(command);
// 3. Immediately respond to the client
return NextResponse.json({
message: 'CV submission accepted for processing.',
jobId: jobId,
statusUrl: `/cv/${jobId}`
}, { status: 202 });
} catch (error) {
console.error('Submission failed:', error);
// In a real project, implement more robust error logging (e.g., to DataDog, Sentry)
return NextResponse.json({ error: 'Failed to accept CV submission.' }, { status: 500 });
}
}
3. 命令处理器: /pages/api/cv/process.ts
这个端点是后台工作的核心。它不应被公开访问,而是由Vercel的Cron Job触发。
配置 vercel.json
触发Cron Job:
{
"crons": [
{
"path": "/api/cv/process",
"schedule": "* * * * *"
}
]
}
注意: Vercel Cron的最小调度间隔是1分钟。对于需要更低延迟的系统,应考虑使用由SQS事件直接触发的AWS Lambda。这里的方案是一个在Vercel生态内完整的、但存在延迟妥协的实现。
// /pages/api/cv/process.ts
import { NextApiRequest, NextApiResponse } from 'next';
import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
import { kv } from '@vercel/kv';
// Mock long-running process
const processCV = (fileUrl: string) => new Promise(resolve => {
console.log(`Processing CV from ${fileUrl}...`);
setTimeout(() => {
console.log('Processing finished.');
resolve({
extractedText: "...",
skills: ["TypeScript", "React", "AWS", "CQRS"],
score: 95,
});
}, 30 * 1000); // Simulate a 30-second task
});
const sqsClient = new SQSClient({
region: process.env.AWS_REGION,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
},
});
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
// Secure the endpoint: only allow Vercel's Cron or internal calls
// In production, this should be a more robust check (e.g., secret token in header)
if (req.headers['user-agent'] !== 'Vercel-Cron/1.0') {
// return res.status(401).send('Unauthorized');
}
try {
const receiveCommand = new ReceiveMessageCommand({
QueueUrl: process.env.SQS_QUEUE_URL,
MaxNumberOfMessages: 5, // Process up to 5 messages per invocation
WaitTimeSeconds: 10, // Use long polling to reduce empty receives
});
const { Messages } = await sqsClient.send(receiveCommand);
if (!Messages || Messages.length === 0) {
return res.status(200).json({ message: 'No new messages to process.' });
}
for (const message of Messages) {
if (!message.Body || !message.ReceiptHandle) continue;
try {
const { jobId, fileUrl } = JSON.parse(message.Body);
// Initial state update
await kv.set(`cv:${jobId}`, { status: 'processing', submittedAt: new Date().toISOString() });
// The actual long-running task
const result = await processCV(fileUrl);
// Update read model with final result
await kv.set(`cv:${jobId}`, { status: 'completed', data: result, completedAt: new Date().toISOString() });
// Trigger ISR for the result page
await fetch(`${req.headers['x-forwarded-proto']}://${req.headers.host}/api/revalidate?secret=${process.env.REVALIDATE_SECRET}&path=/cv/${jobId}`);
// IMPORTANT: Delete the message from the queue after successful processing
const deleteCommand = new DeleteMessageCommand({
QueueUrl: process.env.SQS_QUEUE_URL,
ReceiptHandle: message.ReceiptHandle,
});
await sqsClient.send(deleteCommand);
} catch (processingError) {
console.error('Failed to process message:', message.MessageId, processingError);
// Do not delete the message. SQS will make it visible again after the VisibilityTimeout.
// After configured retries, it will go to the Dead-Letter Queue (DLQ).
}
}
return res.status(200).json({ message: `Processed ${Messages.length} messages.` });
} catch (error) {
console.error('SQS processing loop failed:', error);
return res.status(500).json({ error: 'Failed to process SQS queue.' });
}
}
4. 视图与ISR: /pages/cv/[jobId].tsx
这个页面负责展示结果。getStaticProps
结合 revalidate
属性是ISR的核心。
// /pages/cv/[jobId].tsx
import { GetStaticPaths, GetStaticProps, NextPage } from 'next';
import { kv } from '@vercel/kv';
type CVResult = {
status: 'processing' | 'completed' | 'failed';
data?: {
skills: string[];
score: number;
};
submittedAt?: string;
completedAt?: string;
};
interface PageProps {
jobId: string;
result: CVResult | null;
}
const CVResultPage: NextPage<PageProps> = ({ jobId, result }) => {
if (!result || result.status === 'processing') {
return (
<div>
<h1>CV Analysis for Job ID: {jobId}</h1>
<p>Your CV is currently being processed. This page will automatically update once it's complete.</p>
{/* For better UX, add a client-side polling mechanism here as a fallback */}
</div>
);
}
if (result.status === 'failed') {
return (
<div>
<h1>Processing Failed for Job ID: {jobId}</h1>
<p>We're sorry, an error occurred while processing your CV.</p>
</div>
);
}
return (
<div>
<h1>Analysis Complete for Job ID: {jobId}</h1>
<p>Overall Score: {result.data?.score}</p>
<h2>Detected Skills:</h2>
<ul>
{result.data?.skills.map(skill => <li key={skill}>{skill}</li>)}
</ul>
</div>
);
};
export const getStaticPaths: GetStaticPaths = async () => {
// We don't pre-build any CV pages at build time.
// They are all generated on-demand.
return {
paths: [],
fallback: 'blocking', // or 'true' for a non-blocking experience with loading states
};
};
export const getStaticProps: GetStaticProps<PageProps> = async (context) => {
const jobId = context.params?.jobId as string;
if (!jobId) {
return { notFound: true };
}
const result = await kv.get<CVResult>(`cv:${jobId}`);
return {
props: {
jobId,
result: result ?? null,
},
// Revalidate every 60 minutes, but primarily updated via on-demand revalidation.
revalidate: 3600,
};
};
export default CVResultPage;
最后,是负责触发按需ISR的API端点。
// /pages/api/revalidate.ts
import { NextApiRequest, NextApiResponse } from 'next';
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
if (req.query.secret !== process.env.REVALIDATE_SECRET) {
return res.status(401).json({ message: 'Invalid token' });
}
const path = req.query.path as string;
if (!path) {
return res.status(400).json({ message: 'Path is required' });
}
try {
// This will regenerate the static page for the given path
await res.revalidate(path);
return res.json({ revalidated: true });
} catch (err) {
return res.status(500).send('Error revalidating');
}
}
架构的局限性与未来迭代
这个基于Vercel和AWS SQS的CQRS架构优雅地解决了长时任务与即时响应之间的矛盾,但在真实生产环境中,它并非没有权衡。
- 处理延迟: Vercel Cron的分钟级调度引入了固有的延迟。从消息进入队列到被处理,最长可能有1分钟的等待时间。对于需要近实时处理的场景,将命令处理器迁移到由SQS事件直接触发的AWS Lambda是必然的选择。这打破了纯Vercel生态,但获得了性能和响应速度的巨大提升。
- 可观测性: 链路被拆分到了API、SQS、后台Function等多个部分,故障排查变得复杂。一个
jobId
必须作为关联ID贯穿整个日志系统。引入OpenTelemetry等分布式追踪方案,对于维护这种分布式系统至关重要。 - 最终一致性: 用户在任务完成和页面ISR生效之间可能会看到“处理中”的状态。这是最终一致性架构的正常表现,但需要通过前端技术(如短轮询或WebSocket)来优化用户体验,以便在数据就绪时主动通知客户端刷新,而不是依赖用户手动刷新。
- 死信队列管理: SQS消息在多次处理失败后会进入DLQ。当前方案没有包含DLQ的处理逻辑。生产系统必须有一个监控DLQ的机制,并有相应的流程(自动或手动)来重试或分析这些失败的命令。
尽管存在这些局限,该架构为在Serverless范式下构建健壮、可扩展的异步系统提供了一个扎实的模板。它体现了现代云原生应用设计的核心思想:通过解耦和职责分离来换取系统的韧性与水平扩展能力。