在Vercel平台利用AWS SQS与ISR构建处理长时任务的CQRS架构


一个看似简单的业务需求,往往是检验架构选型是否合理的试金石。例如,允许用户上传个人简历(CV),系统在后台进行深度解析(如技能提取、格式转换、AI评估),这个过程可能耗时30秒到数分钟。用户提交后,我们期望页面能立即响应,并最终在一个高性能的静态页面上展示解析结果。

如果整个系统构建在Vercel这样的Serverless平台上,挑战会变得更加尖锐。Vercel Functions的执行时间上限(Pro套餐为60秒)直接排除了在单个同步请求中完成整个处理流程的可能性。强行执行不仅会导致请求超时,还会造成极差的用户体验。这是一个典型的命令(Command)与查询(Query)负载不匹配的场景,也是引入CQRS(Command Query Responsibility Segregation)架构的绝佳切入点。

定义问题:同步模型的困境

我们首先审视一个直接但不可行的方案:单一Vercel Function同步处理。

// /api/cv/upload-sync.ts
// 这是一个错误示范,用于说明问题
import { NextApiRequest, NextApiResponse } from 'next';
import { formidable } from 'formidable';
import { parseCV, analyzeSkills, generateReport } from '@/lib/cv-processor'; // 耗时操作

export const config = {
  api: {
    bodyParser: false,
  },
};

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method Not Allowed' });
  }

  const form = formidable({});
  const [, files] = await form.parse(req);
  const cvFile = files.cv?.[0];

  if (!cvFile) {
    return res.status(400).json({ error: 'CV file is required.' });
  }

  try {
    // 假设这些函数合计执行时间超过60秒
    const parsedData = await parseCV(cvFile.filepath);
    const skills = await analyzeSkills(parsedData);
    const report = await generateReport(skills);

    // 将结果存入数据库
    // await db.saveReport(report);

    return res.status(200).json({ report });
  } catch (error) {
    console.error('CV processing failed:', error);
    return res.status(500).json({ error: 'Internal Server Error during processing.' });
  }
}

这个模型的致命缺陷在于其同步阻塞的特性:

  1. 超时风险: 任何超过Vercel平台限制时长的处理都会导致请求失败,返回 504 Gateway Timeout
  2. 资源浪费: 一个HTTP处理线程被长时间占用,等待CPU密集型或IO密集型任务完成。这在Serverless按需计费模型下,成本效益极低。
  3. 缺乏韧性: 如果在处理中途发生瞬时错误(如第三方API抖动),整个任务失败,且没有内置的重试机制。用户必须重新上传。
  4. 糟糕的体验: 用户在浏览器前枯等,直到服务器返回结果或超时。

架构决策:基于SQS和ISR的异步CQRS方案

问题的本质是需要将写入操作(命令)与读取操作(查询)彻底解耦。

  • 命令(Command): “提交一份CV进行处理”的意图。它应该被快速接收、验证并排队,但不立即执行。
  • 查询(Query): “查看CV的处理结果”的请求。它应该访问一个为快速读取而优化的数据视图,该视图由后台处理任务最终更新。

基于此,我们选择的架构如下:

  1. 命令入口 (Vercel Edge Function): 一个轻量级的API端点接收CV上传。它的唯一职责是参数校验,生成一个唯一的任务ID,然后将任务元数据推送至AWS SQS (Simple Queue Service) 队列。完成后,它立即向客户端返回202 Accepted状态码和任务ID。整个过程耗时毫秒级。
  2. 命令队列 (AWS SQS): SQS作为命令总线和缓冲区。它保证了命令的持久化,提供了至少一次的交付语义,并支持死信队列(DLQ)来处理无法被消费的毒丸消息。这是系统韧性的核心。
  3. 命令处理器 (Vercel Cron-Triggered Function): 一个独立的、由定时任务(例如每分钟)触发的Vercel Function,作为SQS队列的消费者。它从队列中拉取消息,执行耗时的CV处理逻辑。这个Function不直接面向用户,因此它的执行时长不影响用户体验。
  4. 读模型存储 (Vercel KV): 处理完成后,结果被存储在一个为快速键值查询优化的数据库中,例如Vercel KV或Upstash Redis。
  5. 视图更新与查询 (Next.js with ISR): 用户的CV结果展示页面使用Next.js的增量静态再生(Incremental Static Regeneration, ISR)构建。命令处理器在完成任务后,会调用一个特定的Vercel API(Revalidation API)来触发对应页面的后台重新生成。用户访问时,始终能从Vercel的边缘网络中获取到高度缓存的静态HTML,实现了极致的读取性能。

下面是这个流程的图示:

sequenceDiagram
    participant Client
    participant Vercel Edge API as /api/cv/submit
    participant AWS SQS
    participant Vercel Cron Job as /api/cv/process
    participant Vercel KV as Read Model
    participant Next.js Page as /cv/[id]
    
    Client->>+Vercel Edge API: POST /api/cv/submit (CV file)
    Vercel Edge API->>Vercel Edge API: 1. Validate input & generate jobId
    Vercel Edge API->>+AWS SQS: 2. SendMessage(jobId, cv_location)
    AWS SQS-->>-Vercel Edge API: MessageId
    Vercel Edge API-->>-Client: 3. HTTP 202 Accepted { jobId }
    
    Note over Vercel Cron Job: Triggered periodically (e.g., every minute)
    
    loop Batch Processing
        Vercel Cron Job->>+AWS SQS: 4. ReceiveMessages()
        AWS SQS-->>-Vercel Cron Job: Messages (jobs)
        Vercel Cron Job->>Vercel Cron Job: 5. For each message: process CV (long task)
        Vercel Cron Job->>+Vercel KV: 6. Save processed result against jobId
        Vercel KV-->>-Vercel Cron Job: OK
        Vercel Cron Job->>+Next.js Page: 7. POST /api/revalidate (path: /cv/[jobId])
        Next.js Page-->>-Vercel Cron Job: Revalidation triggered
        Vercel Cron Job->>+AWS SQS: 8. DeleteMessage()
        AWS SQS-->>-Vercel Cron Job: OK
    end
    
    Client->>+Next.js Page: GET /cv/[id]
    Note over Next.js Page: Serves stale static page from Edge, triggers revalidation if needed
    Next.js Page-->>Client: Cached HTML (or "Processing" state)
    
    Note over Next.js Page: After revalidation, new static page is available at the Edge
    Client->>+Next.js Page: Subsequent GET /cv/[id]
    Next.js Page-->>Client: Freshly generated HTML with CV results

核心实现概览

1. 配置与环境变量

在真实项目中,所有敏感信息和配置都应通过环境变量管理。在Vercel项目中,这可以在项目设置中完成。

# .env.local
# AWS Credentials (ensure Vercel has IAM permissions for SQS)
AWS_ACCESS_KEY_ID="your_aws_access_key"
AWS_SECRET_ACCESS_KEY="your_aws_secret_key"
AWS_REGION="us-east-1"
SQS_QUEUE_URL="your_sqs_queue_url"

# Vercel KV (automatically configured when integrated)
KV_URL="..."
KV_REST_API_URL="..."
KV_REST_API_TOKEN="..."
KV_REST_API_READ_ONLY_TOKEN="..."

# Revalidation Secret
REVALIDATE_SECRET="a_very_strong_random_string"

2. 命令入口: /pages/api/cv/submit.ts

这个API端点需要做到极致的轻量和快速。我们使用Vercel Blob来存储上传的文件,然后将文件的URL推送到SQS。

// /pages/api/cv/submit.ts
import { type NextRequest, NextResponse } from 'next/server';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { put as vercelBlobPut } from '@vercel/blob';
import { nanoid } from 'nanoid';

// Vercel Edge Runtime is preferred for this endpoint for low latency
export const runtime = 'edge';

const sqsClient = new SQSClient({
  region: process.env.AWS_REGION!,
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
  },
});

export default async function handler(req: NextRequest) {
  if (req.method !== 'POST') {
    return new NextResponse('Method Not Allowed', { status: 405 });
  }

  const formData = await req.formData();
  const file = formData.get('cv') as File | null;

  if (!file || file.size === 0) {
    return NextResponse.json({ error: 'CV file is required.' }, { status: 400 });
  }

  // A common mistake is to process the file here. 
  // Instead, we offload it immediately.
  const jobId = nanoid();
  const pathname = `cvs/${jobId}-${file.name}`;

  try {
    // 1. Upload file to a persistent storage like Vercel Blob or S3
    const blob = await vercelBlobPut(pathname, file, {
      access: 'public',
    });

    // 2. Send a message to SQS with job details
    const command = new SendMessageCommand({
      QueueUrl: process.env.SQS_QUEUE_URL!,
      MessageBody: JSON.stringify({
        jobId,
        fileUrl: blob.url,
        fileName: file.name,
        submittedAt: new Date().toISOString(),
      }),
      // Using MessageGroupId for FIFO queues if needed, but not essential for this use case
      MessageGroupId: 'cv-processing', 
      MessageDeduplicationId: jobId, // Prevents duplicate messages for 5 mins
    });

    await sqsClient.send(command);

    // 3. Immediately respond to the client
    return NextResponse.json({ 
      message: 'CV submission accepted for processing.',
      jobId: jobId,
      statusUrl: `/cv/${jobId}`
    }, { status: 202 });

  } catch (error) {
    console.error('Submission failed:', error);
    // In a real project, implement more robust error logging (e.g., to DataDog, Sentry)
    return NextResponse.json({ error: 'Failed to accept CV submission.' }, { status: 500 });
  }
}

3. 命令处理器: /pages/api/cv/process.ts

这个端点是后台工作的核心。它不应被公开访问,而是由Vercel的Cron Job触发。

配置 vercel.json 触发Cron Job:

{
  "crons": [
    {
      "path": "/api/cv/process",
      "schedule": "* * * * *"
    }
  ]
}

注意: Vercel Cron的最小调度间隔是1分钟。对于需要更低延迟的系统,应考虑使用由SQS事件直接触发的AWS Lambda。这里的方案是一个在Vercel生态内完整的、但存在延迟妥协的实现。

// /pages/api/cv/process.ts
import { NextApiRequest, NextApiResponse } from 'next';
import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
import { kv } from '@vercel/kv';

// Mock long-running process
const processCV = (fileUrl: string) => new Promise(resolve => {
  console.log(`Processing CV from ${fileUrl}...`);
  setTimeout(() => {
    console.log('Processing finished.');
    resolve({
      extractedText: "...",
      skills: ["TypeScript", "React", "AWS", "CQRS"],
      score: 95,
    });
  }, 30 * 1000); // Simulate a 30-second task
});

const sqsClient = new SQSClient({
  region: process.env.AWS_REGION,
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
  },
});

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
  // Secure the endpoint: only allow Vercel's Cron or internal calls
  // In production, this should be a more robust check (e.g., secret token in header)
  if (req.headers['user-agent'] !== 'Vercel-Cron/1.0') {
     // return res.status(401).send('Unauthorized');
  }

  try {
    const receiveCommand = new ReceiveMessageCommand({
      QueueUrl: process.env.SQS_QUEUE_URL,
      MaxNumberOfMessages: 5, // Process up to 5 messages per invocation
      WaitTimeSeconds: 10,    // Use long polling to reduce empty receives
    });

    const { Messages } = await sqsClient.send(receiveCommand);

    if (!Messages || Messages.length === 0) {
      return res.status(200).json({ message: 'No new messages to process.' });
    }

    for (const message of Messages) {
      if (!message.Body || !message.ReceiptHandle) continue;

      try {
        const { jobId, fileUrl } = JSON.parse(message.Body);
        
        // Initial state update
        await kv.set(`cv:${jobId}`, { status: 'processing', submittedAt: new Date().toISOString() });

        // The actual long-running task
        const result = await processCV(fileUrl);
        
        // Update read model with final result
        await kv.set(`cv:${jobId}`, { status: 'completed', data: result, completedAt: new Date().toISOString() });

        // Trigger ISR for the result page
        await fetch(`${req.headers['x-forwarded-proto']}://${req.headers.host}/api/revalidate?secret=${process.env.REVALIDATE_SECRET}&path=/cv/${jobId}`);

        // IMPORTANT: Delete the message from the queue after successful processing
        const deleteCommand = new DeleteMessageCommand({
          QueueUrl: process.env.SQS_QUEUE_URL,
          ReceiptHandle: message.ReceiptHandle,
        });
        await sqsClient.send(deleteCommand);

      } catch (processingError) {
        console.error('Failed to process message:', message.MessageId, processingError);
        // Do not delete the message. SQS will make it visible again after the VisibilityTimeout.
        // After configured retries, it will go to the Dead-Letter Queue (DLQ).
      }
    }
    
    return res.status(200).json({ message: `Processed ${Messages.length} messages.` });

  } catch (error) {
    console.error('SQS processing loop failed:', error);
    return res.status(500).json({ error: 'Failed to process SQS queue.' });
  }
}

4. 视图与ISR: /pages/cv/[jobId].tsx

这个页面负责展示结果。getStaticProps 结合 revalidate 属性是ISR的核心。

// /pages/cv/[jobId].tsx
import { GetStaticPaths, GetStaticProps, NextPage } from 'next';
import { kv } from '@vercel/kv';

type CVResult = {
  status: 'processing' | 'completed' | 'failed';
  data?: {
    skills: string[];
    score: number;
  };
  submittedAt?: string;
  completedAt?: string;
};

interface PageProps {
  jobId: string;
  result: CVResult | null;
}

const CVResultPage: NextPage<PageProps> = ({ jobId, result }) => {
  if (!result || result.status === 'processing') {
    return (
      <div>
        <h1>CV Analysis for Job ID: {jobId}</h1>
        <p>Your CV is currently being processed. This page will automatically update once it's complete.</p>
        {/* For better UX, add a client-side polling mechanism here as a fallback */}
      </div>
    );
  }

  if (result.status === 'failed') {
    return (
      <div>
        <h1>Processing Failed for Job ID: {jobId}</h1>
        <p>We're sorry, an error occurred while processing your CV.</p>
      </div>
    );
  }
  
  return (
    <div>
      <h1>Analysis Complete for Job ID: {jobId}</h1>
      <p>Overall Score: {result.data?.score}</p>
      <h2>Detected Skills:</h2>
      <ul>
        {result.data?.skills.map(skill => <li key={skill}>{skill}</li>)}
      </ul>
    </div>
  );
};

export const getStaticPaths: GetStaticPaths = async () => {
  // We don't pre-build any CV pages at build time.
  // They are all generated on-demand.
  return {
    paths: [],
    fallback: 'blocking', // or 'true' for a non-blocking experience with loading states
  };
};

export const getStaticProps: GetStaticProps<PageProps> = async (context) => {
  const jobId = context.params?.jobId as string;

  if (!jobId) {
    return { notFound: true };
  }

  const result = await kv.get<CVResult>(`cv:${jobId}`);

  return {
    props: {
      jobId,
      result: result ?? null,
    },
    // Revalidate every 60 minutes, but primarily updated via on-demand revalidation.
    revalidate: 3600, 
  };
};

export default CVResultPage;

最后,是负责触发按需ISR的API端点。

// /pages/api/revalidate.ts
import { NextApiRequest, NextApiResponse } from 'next';

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
  if (req.query.secret !== process.env.REVALIDATE_SECRET) {
    return res.status(401).json({ message: 'Invalid token' });
  }

  const path = req.query.path as string;
  if (!path) {
    return res.status(400).json({ message: 'Path is required' });
  }

  try {
    // This will regenerate the static page for the given path
    await res.revalidate(path);
    return res.json({ revalidated: true });
  } catch (err) {
    return res.status(500).send('Error revalidating');
  }
}

架构的局限性与未来迭代

这个基于Vercel和AWS SQS的CQRS架构优雅地解决了长时任务与即时响应之间的矛盾,但在真实生产环境中,它并非没有权衡。

  1. 处理延迟: Vercel Cron的分钟级调度引入了固有的延迟。从消息进入队列到被处理,最长可能有1分钟的等待时间。对于需要近实时处理的场景,将命令处理器迁移到由SQS事件直接触发的AWS Lambda是必然的选择。这打破了纯Vercel生态,但获得了性能和响应速度的巨大提升。
  2. 可观测性: 链路被拆分到了API、SQS、后台Function等多个部分,故障排查变得复杂。一个jobId必须作为关联ID贯穿整个日志系统。引入OpenTelemetry等分布式追踪方案,对于维护这种分布式系统至关重要。
  3. 最终一致性: 用户在任务完成和页面ISR生效之间可能会看到“处理中”的状态。这是最终一致性架构的正常表现,但需要通过前端技术(如短轮询或WebSocket)来优化用户体验,以便在数据就绪时主动通知客户端刷新,而不是依赖用户手动刷新。
  4. 死信队列管理: SQS消息在多次处理失败后会进入DLQ。当前方案没有包含DLQ的处理逻辑。生产系统必须有一个监控DLQ的机制,并有相应的流程(自动或手动)来重试或分析这些失败的命令。

尽管存在这些局限,该架构为在Serverless范式下构建健壮、可扩展的异步系统提供了一个扎实的模板。它体现了现代云原生应用设计的核心思想:通过解耦和职责分离来换取系统的韧性与水平扩展能力。


  目录