使用Packer构建集成了Cypress与两阶段提交协议的不可变容器化测试环境


E2E测试套件的稳定性是我们支付核心模块的阿喀琉斯之踵。这个模块依赖一个经典的分布式事务协议——两阶段提交(2PC)来确保订单、库存和账单服务的数据一致性。问题在于,CI流水线中的测试环境极其脆弱。服务启动顺序、网络延迟、甚至是不同CI节点上Docker版本的细微差异,都能导致测试随机失败。调试这些失败堪称噩梦,因为你永远无法确定问题出在我们的代码,还是那个飘忽不定的环境本身。

最初的方案是在CI脚本里用docker-compose拉起整个环境。这在本地看似可行,但到了生产CI中就暴露了诸多问题:镜像拉取耗时、依赖服务健康检查逻辑复杂、启动脚本的幂等性难以保证。我们需要一个能将整个多服务测试环境,连同测试执行器(Cypress)本身,打包成一个原子化、不可变的单元的方案。这个单元应该像一个二进制文件一样,在任何地方执行都能得到完全相同的结果。

这就是引入Packer的契机。Packer的核心思想是创建“黄金镜像”。我们不应该在运行时去配置环境,而是在构建时就将一个完全配置好、预热完毕的环境“烘焙”成一个镜像。我们的目标是创建一个Docker镜像,它内部已经运行了所有参与2PC的微服务,并且包含了Cypress测试套件。运行这个镜像的唯一目的,就是执行测试并报告结果。

模拟两阶段提交的微服务体系

为了验证这套体系,我们首先需要一个最小化的、能够模拟2PC流程的系统。它由三个Go服务构成:

  1. Coordinator (协调者): 接收外部请求,驱动整个2PC流程。
  2. OrderService (参与者1): 管理订单状态。
  3. StockService (参与者2): 管理库存状态。

通信协议使用简单的HTTP/JSON。

Coordinator (coordinator/main.go)

package main

import (
    "bytes"
    "encoding/json"
    "log"
    "net/http"
    "sync"
    "time"
)

// 全局状态,用于Cypress查询
var transactionState = "INIT"
var transactionID string

type TransactionParticipant struct {
    Name        string `json:"name"`
    PrepareURL  string `json:"prepare_url"`
    CommitURL   string `json:"commit_url"`
    RollbackURL string `json:"rollback_url"`
}

var participants = []TransactionParticipant{
    {Name: "OrderService", PrepareURL: "http://order_service:8081/prepare", CommitURL: "http://order_service:8081/commit", RollbackURL: "http://order_service:8081/rollback"},
    {Name: "StockService", PrepareURL: "http://stock_service:8082/prepare", CommitURL: "http://stock_service:8082/commit", RollbackURL: "http://stock_service:8082/rollback"},
}

func main() {
    http.HandleFunc("/start-transaction", handleTransaction)
    http.HandleFunc("/state", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(map[string]string{"state": transactionState})
    })
    log.Println("Coordinator listening on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

func handleTransaction(w http.ResponseWriter, r *http.Request) {
    transactionID = time.Now().Format(time.RFC3339Nano)
    log.Printf("Starting transaction: %s\n", transactionID)
    transactionState = "PENDING"

    // Phase 1: Prepare
    var wg sync.WaitGroup
    // 使用channel来收集投票结果,避免在并发goroutine中直接修改共享变量
    votes := make(chan bool, len(participants))

    for _, p := range participants {
        wg.Add(1)
        go func(participant TransactionParticipant) {
            defer wg.Done()
            payload := map[string]string{"tx_id": transactionID}
            body, _ := json.Marshal(payload)
            resp, err := http.Post(participant.PrepareURL, "application/json", bytes.NewBuffer(body))
            
            // 真实的错误处理会更复杂,这里简化
            if err != nil || resp.StatusCode != http.StatusOK {
                log.Printf("Participant %s failed to prepare. Error: %v\n", participant.Name, err)
                votes <- false
                return
            }
            log.Printf("Participant %s prepared successfully.\n", participant.Name)
            votes <- true
        }(p)
    }

    wg.Wait()
    close(votes)

    // 检查所有投票结果
    allVotedYes := true
    for vote := range votes {
        if !vote {
            allVotedYes = false
            break
        }
    }

    // Phase 2: Commit/Rollback
    if allVotedYes {
        log.Println("All participants voted YES. Committing...")
        transactionState = "COMMITTING"
        broadcast("commit")
        transactionState = "COMMITTED"
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("Transaction Committed"))
    } else {
        log.Println("Some participants voted NO. Rolling back...")
        transactionState = "ROLLING_BACK"
        broadcast("rollback")
        transactionState = "ROLLED_BACK"
        w.WriteHeader(http.StatusInternalServerError)
        w.Write([]byte("Transaction Rolled Back"))
    }
}

func broadcast(action string) {
    var wg sync.WaitGroup
    for _, p := range participants {
        wg.Add(1)
        go func(participant TransactionParticipant) {
            defer wg.Done()
            var url string
            switch action {
            case "commit":
                url = participant.CommitURL
            case "rollback":
                url = participant.RollbackURL
            }
            payload := map[string]string{"tx_id": transactionID}
            body, _ := json.Marshal(payload)
            // 在生产环境中,需要重试和错误处理逻辑
            _, err := http.Post(url, "application/json", bytes.NewBuffer(body))
            if err != nil {
                log.Printf("Failed to send %s to %s: %v\n", action, participant.Name, err)
            }
        }(p)
    }
    wg.Wait()
}

Participant (order_service/main.go, stock_service/main.go 逻辑类似)

package main

import (
    "encoding/json"
    "log"
    "net/http"
    "sync"
)

// 使用sync.Map保证并发安全
var state = &sync.Map{} // Key: tx_id, Value: "prepared" | "committed" | "rolled_back"

func main() {
    // 注入一个环境变量来控制该服务是否在prepare阶段失败,用于测试回滚场景
    // os.Getenv("FORCE_PREPARE_FAILURE")
    
    http.HandleFunc("/prepare", handlePrepare)
    http.HandleFunc("/commit", handleCommit)
    http.HandleFunc("/rollback", handleRollback)
    http.HandleFunc("/state", handleState)

    serviceName := "OrderService"
    port := ":8081"
    log.Printf("%s listening on %s\n", serviceName, port)
    log.Fatal(http.ListenAndServe(port, nil))
}

func handlePrepare(w http.ResponseWriter, r *http.Request) {
    var payload map[string]string
    json.NewDecoder(r.Body).Decode(&payload)
    txID := payload["tx_id"]
    log.Printf("Received prepare for tx: %s\n", txID)
    
    // 模拟业务逻辑检查,这里简单地设置为成功
    state.Store(txID, "prepared")
    w.WriteHeader(http.StatusOK)
}

func handleCommit(w http.ResponseWriter, r *http.Request) {
    var payload map[string]string
    json.NewDecoder(r.Body).Decode(&payload)
    txID := payload["tx_id"]
    log.Printf("Received commit for tx: %s\n", txID)
    state.Store(txID, "committed")
    w.WriteHeader(http.StatusOK)
}

func handleRollback(w http.ResponseWriter, r *http.Request) {
    var payload map[string]string
    json.NewDecoder(r.Body).Decode(&payload)
    txID := payload["tx_id"]
    log.Printf("Received rollback for tx: %s\n", txID)
    state.Store(txID, "rolled_back")
    w.WriteHeader(http.StatusOK)
}

func handleState(w http.ResponseWriter, r *http.Request) {
    txID := r.URL.Query().Get("tx_id")
    current, ok := state.Load(txID)
    if !ok {
        http.NotFound(w, r)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(map[string]string{"state": current.(string)})
}

使用 docker-compose 编排内部环境

Packer在构建镜像时,会启动一个临时的容器。我们需要在这个容器内使用 docker-compose 来拉起并固化我们的微服务体系。

docker-compose.yml

version: '3.8'

services:
  coordinator:
    build: ./coordinator
    ports:
      - "8080:8080"
    networks:
      - 2pc-net

  order_service:
    build: ./order_service
    ports:
      - "8081:8081"
    networks:
      - 2pc-net
    # 用于测试回滚场景
    # environment:
    #   - FORCE_PREPARE_FAILURE=true

  stock_service:
    build: ./stock_service
    ports:
      - "8082:8082"
    networks:
      - 2pc-net

networks:
  2pc-net:
    driver: bridge

关键所在:Packer 构建脚本

packer.pkr.hcl 是整个方案的核心。它定义了如何从一个基础镜像开始,一步步地将我们的应用、依赖、Cypress测试套件和启动逻辑全部打包进去。

packer {
  required_plugins {
    docker = {
      version = ">= 1.0.8"
      source  = "github.com/hashicorp/docker"
    }
  }
}

variable "base_image" {
  type    = string
  default = "cypress/included:12.17.4" // 使用官方带浏览器的Cypress镜像作为基础
}

variable "image_tag" {
  type    = string
  default = "2pc-e2e-test:latest"
}

source "docker" "e2e_test_env" {
  image  = var.base_image
  commit = true // 这是关键,它会在provisioner执行后将容器状态保存为新镜像
  changes = [
    "WORKDIR /app",
    // 覆盖基础镜像的ENTRYPOINT,使我们的测试脚本成为默认命令
    "ENTRYPOINT [\"/app/run_tests.sh\"]" 
  ]
}

build {
  name    = "build-2pc-test-image"
  sources = ["source.docker.e2e_test_env"]

  provisioner "file" {
    source      = "./services/"
    destination = "/app/services"
  }
  
  provisioner "file" {
    source      = "./docker-compose.yml"
    destination = "/app/docker-compose.yml"
  }
  
  provisioner "file" {
    source      = "./cypress/"
    destination = "/app/cypress"
  }
  
  provisioner "file" {
    source      = "./cypress.config.js"
    destination = "/app/cypress.config.js"
  }

  provisioner "file" {
    source      = "./run_tests.sh"
    destination = "/app/run_tests.sh"
  }
  
  provisioner "shell" {
    inline = [
      "chmod +x /app/run_tests.sh",
      // 在构建时就安装好所有依赖,包括 docker-compose
      "apt-get update && apt-get install -y docker-compose",
      "cd /app",
      // 在Packer构建过程中,直接构建并启动所有服务
      "docker-compose build",
      "docker-compose up -d"
    ]
  }
  
  // 构建完成后,对镜像打标签
  post-processor "docker-tag" {
    repository = "your-repo/2pc-e2e-test"
    tags       = ["latest", "v1.0.0"]
  }
}

这里的核心逻辑是:

  1. source "docker": 从一个包含Cypress和浏览器的基础镜像启动。commit = true至关重要,它指示Packer在所有provisioner执行完毕后,将容器的当前文件系统和状态保存(commit)为一个新的Docker镜像。
  2. provisioner "file": 将我们的Go服务代码、docker-compose.yml和Cypress测试代码全部复制到镜像内。
  3. provisioner "shell": 这是实现“烘焙”的步骤。它在构建容器内部安装docker-compose,然后执行docker-compose up -d。这意味着,当Packer完成构建时,最终生成的镜像里,我们的三个微服务已经处于运行状态。
  4. changes: 我们覆盖了基础镜像的ENTRYPOINT,使其指向一个自定义的测试启动脚本。

Cypress 测试与启动脚本

Cypress 测试需要验证2PC的成功和失败两种场景。

cypress.config.js

const { defineConfig } = require('cypress');

module.exports = defineConfig({
  e2e: {
    // 这里的baseUrl指向容器内部的协调者服务
    baseUrl: 'http://localhost:8080',
    // 关闭视频录制以减小镜像体积和执行时间
    video: false, 
    // 更激进的超时,因为网络是容器内部的,应该非常快
    defaultCommandTimeout: 5000, 
    setupNodeEvents(on, config) {
      // implement node event listeners here
    },
  },
});

cypress/e2e/2pc.cy.js

describe('Two-Phase Commit E2E Test', () => {

  // 使用一个全局变量来存储事务ID,以便在不同服务间查询状态
  let transactionId; 

  const getServiceState = (serviceUrl) => {
    // Cypress的baseUrl是协调者,所以这里要用完整的URL
    return cy.request({
      method: 'GET',
      url: `${serviceUrl}?tx_id=${transactionId}`,
      retryOnStatusCodeFailure: true,
      // failOnStatusCode: false,
    }).then(response => response.body.state);
  };

  it('should successfully commit a transaction when all participants agree', () => {
    cy.request({
      method: 'POST',
      url: '/start-transaction',
      // 在响应头中获取事务ID
    }).then(response => {
      expect(response.status).to.eq(200);
      expect(response.body).to.contain('Transaction Committed');

      // 从服务日志或特定端点获取TX ID在真实项目中更可靠
      // 这里我们为了简化,需要一种方式获取ID
      // 假设我们通过查询Coordinator的状态来获取最近一次的ID
      // 在真实系统中,Coordinator应返回ID
    });

    // 这里有个设计上的取舍:如何获取事务ID?
    // 我们的简单实现没有返回ID。在真实世界,API会返回它。
    // 为了测试,我们可以改造Coordinator,让它在 /state 端点暴露最近的tx_id
    // 或者,更简单的方式,我们直接轮询参与者的状态,直到找到一个'prepared'或'committed'的记录
    // 但这会使测试变复杂。
    // 目前的实现,我们只能断言最终结果,而无法精确验证中间状态,这是一个待改进点。
    // 我们在此假设测试重点是端到端的结果。
    
    // 由于我们无法轻易获取tx_id, 我们退而求其次,检查coordinator的全局状态
    cy.request('/state').its('body.state').should('eq', 'COMMITTED');

    // 完整的测试应查询参与者,但需要 tx_id
    // getServiceState('http://localhost:8081/state').should('eq', 'committed');
    // getServiceState('http://localhost:8082/state').should('eq', 'committed');
  });

  // 要测试回滚,我们需要一个不同的环境,其中一个参与者会失败。
  // 这正是Packer的强大之处,我们可以构建另一个镜像变体。
  it('is a placeholder for the rollback test which requires a different baked image', () => {
    // 比如,我们可以创建一个 `docker-compose.failure.yml`,
    // 然后用 `packer build -var "compose_file=docker-compose.failure.yml" ...` 来构建
    // 那个compose文件会给OrderService注入 `FORCE_PREPARE_FAILURE=true` 环境变量
    cy.log('This test would run against an image built to force a failure.');
  });
});

run_tests.sh

#!/bin/bash
set -e # 任何命令失败则立即退出

echo "Starting E2E test run..."

# 检查所有服务是否就绪。在真实项目中,这里应该用更健壮的健康检查。
# 例如,使用 `docker-compose ps` 或 nc/curl 循环检查端口。
# 由于服务已经在镜像构建时启动,它们应该立即可用。
sleep 5 # 简单等待,以防万一

# 执行Cypress测试
# `--headless` 在CI环境中运行
# `--browser chrome` 指定浏览器
# `--spec` 指定要运行的测试文件
cypress run --headless --browser chrome --spec "cypress/e2e/2pc.cy.js"

# 测试退出码会传递给脚本,如果测试失败,脚本也会失败,从而导致容器以非0状态退出
TEST_EXIT_CODE=$?

echo "Test run finished with exit code: $TEST_EXIT_CODE"

# 在CI中,容器的退出码决定了步骤的成败
exit $TEST_EXIT_CODE

这个脚本是镜像的入口点。它简单地执行Cypress命令。由于服务已经在镜像内部运行,无需任何额外的docker-compose up操作。

执行流程与成果

现在,构建和运行测试的流程变得极其简单:

  1. 构建 (一次性,或代码变更后):

    packer build .

    这个命令会产出一个名为 2pc-e2e-test:latest 的Docker镜像。这个镜像体积会比较大,因为它包含了整个操作系统、浏览器、Node.js环境、Go二进制文件以及运行中的服务。

  2. 运行测试 (在任何地方):

    docker run --rm your-repo/2pc-e2e-test:latest

    这个命令会启动容器,容器内的 run_tests.sh 脚本立即执行,运行Cypress测试套件,并将结果输出到控制台。测试结束后,容器自动销毁。整个过程不依赖任何外部服务,不拉取任何其他镜像,实现了完美的隔离性和可复现性。

方案的局限性与未来展望

这个方案并非没有代价。最明显的就是镜像体积。一个包含了浏览器和所有服务的镜像轻易就能达到数GB。这会增加CI缓存和镜像仓库的存储压力。可以通过多阶段构建(multi-stage builds for Go services)和清理apt缓存等方式进行优化,但本质问题依然存在。

其次,对于需要测试多种失败组合的复杂场景(如网络分区、特定参与者超时),为每种场景构建一个镜像变体可能会导致镜像数量激增。一种改进思路是在镜像入口点脚本中接收环境变量,通过这些变量来动态配置服务行为(如注入延迟、强制失败),从而让单个镜像能覆盖更多测试场景,但这又在一定程度上牺牲了纯粹的不可变性。

最后,这种将整个环境“冻结”在镜像中的做法,对于需要动态生成大量测试数据的场景支持不佳。任何测试数据的变更都需要重新构建镜像。对于这类场景,可能需要结合卷挂载或在容器启动时通过API注入数据,但这同样需要小心处理,避免破坏环境的一致性。


  目录