基于 Raft 与 AWS SNS 构建一个分布式实时特性开关服务的实践


最初的需求是在一个迭代周期(我们遵循Scrum)中提出的:我们需要一个高可用的特性开关(Feature Flag)系统。团队厌倦了依赖配置文件和复杂的部署流程来切换功能。市面上的商业方案因数据隐私和成本问题被否决,而基于Redis的简单方案则存在单点故障风险。我们的目标是构建一个内部服务,它必须是分布式的、强一致的,并且对开关状态的变更能做到近乎实时的全局通知。

这个决定把我们引向了自研的道路。核心问题是数据一致性,这自然而然地让我们想到了共识协议。相比Paxos的复杂性,Raft协议以其出色的可理解性成为了我们的首选。我们不打算从零造一个生产级的etcd,而是要实现一个足够轻量、能满足特性开关场景的Raft核心。

Sprint 1-3: Raft核心的奠基

我们的第一步是定义Raft节点的核心数据结构和状态机。我们使用Go语言,因为它在并发和网络编程方面的优势非常适合这类项目。

一个Raft节点的核心是其状态,包括持久化状态和易失性状态。

// file: raft_node.go

package raft

import (
	"context"
	"log"
	"sync"
	"time"
)

// NodeState defines the role of a node in the Raft cluster.
type NodeState int

const (
	Follower NodeState = iota
	Candidate
	Leader
)

// LogEntry represents a command to be applied to the state machine.
// For our feature flag system, this will be a set/delete operation.
type LogEntry struct {
	Term    int
	Command interface{} // e.g., map[string]string{"op": "set", "key": "new_feature", "value": "true"}
}

// RaftNode represents a single node in the Raft cluster.
type RaftNode struct {
	mu sync.Mutex

	// Node identifiers
	id      string   // Unique ID for this node.
	peers   []string // IDs of other nodes in the cluster.

	// Persistent state on all servers:
	currentTerm int
	votedFor    string
	log         []LogEntry

	// Volatile state on all servers:
	commitIndex int
	lastApplied int
	state       NodeState

	// Volatile state on leaders:
	nextIndex  map[string]int
	matchIndex map[string]int

	// Channels for communication and control
	applyCh       chan<- ApplyMsg // Channel to send committed commands to the state machine.
	electionTimer *time.Timer
	heartbeat     *time.Ticker
	
	// A simple key-value store representing the actual state machine for feature flags.
	featureFlags map[string]string
}

// ApplyMsg is sent on the applyCh channel when a log entry is committed.
type ApplyMsg struct {
	CommandValid bool
	Command      interface{}
	CommandIndex int
}

// NewRaftNode creates a new Raft node.
func NewRaftNode(id string, peers []string, applyCh chan<- ApplyMsg) *RaftNode {
	node := &RaftNode{
		id:           id,
		peers:        peers,
		currentTerm:  0,
		votedFor:     "",
		log:          make([]LogEntry, 1), // Log starts with a dummy entry at index 0.
		commitIndex:  0,
		lastApplied:  0,
		state:        Follower,
		nextIndex:    make(map[string]int),
		matchIndex:   make(map[string]int),
		applyCh:      applyCh,
		featureFlags: make(map[string]string),
	}

	// In a real implementation, persistent state would be loaded from stable storage here.
	log.Printf("[Node %s] Initialized as Follower in term %d", id, node.currentTerm)
	node.resetElectionTimer()
	
	return node
}

// resetElectionTimer restarts the election timer with a random timeout.
// This randomness is crucial to prevent split votes.
func (rn *RaftNode) resetElectionTimer() {
	timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
	if rn.electionTimer == nil {
		rn.electionTimer = time.NewTimer(timeout)
	} else {
		rn.electionTimer.Reset(timeout)
	}
}

// ... other core Raft logic like becomeFollower, becomeCandidate, becomeLeader ...

这里的关键在于 applyCh channel。Raft协议本身只负责日志的复制和共识,它不关心日志里的 Command 究竟是什么。当一个日志条目被确认提交(即被集群中大多数节点复制)后,Raft节点会通过applyCh将这个命令发送给上层应用(我们的特性开关状态机),由上层应用来真正执行它。

下面是一个简化的主循环,用于处理选举超时和心跳。

// file: raft_loop.go

package raft

import (
	"log"
	"time"
)

// run is the main loop for the Raft node.
func (rn *RaftNode) run() {
	for {
		switch rn.getState() {
		case Follower:
			select {
			case <-rn.electionTimer.C:
				log.Printf("[Node %s] Election timeout, becoming Candidate", rn.id)
				rn.becomeCandidate()
			// ... handle RPC requests ...
			}
		case Candidate:
			select {
			case <-rn.electionTimer.C:
				log.Printf("[Node %s] Election timeout, starting new election for term %d", rn.id, rn.currentTerm+1)
				rn.becomeCandidate() // Start a new election
			// ... handle RPC requests and vote results ...
			}
		case Leader:
			rn.sendHeartbeats()
			time.Sleep(50 * time.Millisecond) // Heartbeat interval
		}
	}
}

// sendHeartbeats is called by the leader to maintain authority.
func (rn *RaftNode) sendHeartbeats() {
	rn.mu.Lock()
	defer rn.mu.Unlock()

	for _, peerId := range rn.peers {
		if peerId == rn.id {
			continue
		}
		
		// In a real system, this would be an RPC call.
		go func(peer string) {
			// Construct AppendEntries RPC args, potentially empty for a heartbeat.
			// ...
			// response := sendAppendEntriesRPC(peer, args)
			// Handle response to update nextIndex, matchIndex, etc.
		}(peerId)
	}
}

在实现了基本的选举和日志复制逻辑后,我们用一个内存中的map来模拟特性开关的存储。当ApplyMsg到达时,我们就更新这个map。

Sprint 4: API层与Caddy的引入

Raft集群本身只是一个状态机,我们需要一个HTTP API来与之交互。我们为Go应用添加了一个简单的HTTP服务器,提供两个端点:

  • POST /set: 用于设置或更新一个特性开关。
  • GET /get/:key: 用于查询一个特性开关的状态。

POST /set的处理函数不会直接修改状态。它会将命令提交给Raft Leader。如果当前节点不是Leader,它会返回Leader的地址。

// file: http_api.go

func (s *HttpServer) handleSetFlag(w http.ResponseWriter, r *http.Request) {
	// ... parse request body to get key and value ...
	
	command := map[string]string{"op": "set", "key": key, "value": value}
	
	// Submit the command to the Raft node.
	// The Start() method returns immediately. The command is processed asynchronously.
	index, term, isLeader := s.raftNode.Start(command)

	if !isLeader {
		// In a real system, you'd proxy the request or return the leader's address.
		http.Error(w, "Not a leader", http.StatusTemporaryRedirect)
		return
	}

	// The challenge here is how to wait for the command to be committed.
	// A common pattern is to use a channel map, where the client waits on a
	// channel associated with the log index. For simplicity, we'll omit that here.
	
	log.Printf("Leader accepted command. Index: %d, Term: %d", index, term)
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, "Command accepted at index %d", index)
}

接下来是暴露服务。我们有多个Raft节点,每个节点都运行一个HTTP服务器。客户端需要一个统一的入口点。这里我们选择了Caddy,而不是Nginx。原因有三:

  1. 自动HTTPS: Caddy默认开启HTTPS,并自动管理证书。对于内部服务来说,这是巨大的安全和便利性提升。
  2. 配置简洁: Caddyfile的语法比Nginx配置简单得多,可读性极强。
  3. 动态性: Caddy的API驱动配置让我们未来可以动态更新后端节点,虽然在当前阶段我们使用的是静态配置。

我们的Caddyfile非常简单:

# Caddyfile

feature-flags.internal.mycorp.com {
    # Automatic HTTPS for this internal domain
    tls internal

    # Log configuration
    log {
        output file /var/log/caddy/feature-flags.log {
            roll_size 10mb
            roll_keep 5
        }
        format console
        level INFO
    }

    # Reverse proxy requests to the Raft cluster nodes.
    # The `lb_policy` ensures we try another backend if one fails.
    reverse_proxy {
        to node1:8080 node2:8080 node3:8080
        lb_policy first
        health_uri /health
        health_interval 5s
        health_timeout 2s
    }

    # Error handling
    handle_errors {
        respond "{http.error.status_code} {http.error.status_text}"
    }
}

lb_policy first在这里是有意为之的。由于只有Leader能处理写请求,客户端需要自己实现重试逻辑,找到Leader。对于读请求,任何节点都可以服务(尽管可能是陈旧的数据,这对于我们的场景是可接受的)。

Sprint 5-6: 引入AWS SNS实现事件驱动通知

系统上线后,我们遇到了一个新的问题。其他微服务为了获取最新的开关状态,不得不频繁地轮询/get/:key接口,这造成了大量的无效流量。这是一个典型的场景,需要从拉(Pull)模型转向推(Push)模型。

我们的架构中已经有AWS的身影,所以使用AWS SNS (Simple Notification Service) 成了一个自然的选择。当Raft状态机应用一个已提交的日志条目时,Leader节点会负责将这个变更发布到一个SNS Topic。

首先,我们修改状态机的应用逻辑:

// file: state_machine.go

import (
	"context"
	"encoding/json"
	"log"
	
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sns"
)

type SnsNotifier struct {
	client  *sns.Client
	topicArn string
}

func NewSnsNotifier(topicArn string) (*SnsNotifier, error) {
	cfg, err := config.LoadDefaultConfig(context.TODO())
	if err != nil {
		return nil, fmt.Errorf("failed to load AWS config: %w", err)
	}
	return &SnsNotifier{
		client:  sns.NewFromConfig(cfg),
		topicArn: topicArn,
	}, nil
}

// applyToStateMachine is where the actual state change happens.
// Now, it also triggers a notification.
func (rn *RaftNode) applyToStateMachine(command interface{}, notifier *SnsNotifier) {
	rn.mu.Lock()
	defer rn.mu.Unlock()

	cmdMap, ok := command.(map[string]string)
	if !ok {
		log.Printf("[Node %s] ERROR: Invalid command format", rn.id)
		return
	}
	
	op := cmdMap["op"]
	key := cmdMap["key"]
	value := cmdMap["value"]

	log.Printf("[Node %s] Applying command: %v", rn.id, cmdMap)
	
	var changed bool
	currentValue, exists := rn.featureFlags[key]

	switch op {
	case "set":
		if !exists || currentValue != value {
			rn.featureFlags[key] = value
			changed = true
		}
	case "delete":
		if exists {
			delete(rn.featureFlags, key)
			changed = true
		}
	}

	// IMPORTANT: Only the leader should publish the notification to avoid duplicates.
	if changed && rn.state == Leader && notifier != nil {
		go notifier.PublishChange(context.Background(), cmdMap)
	}
}

func (n *SnsNotifier) PublishChange(ctx context.Context, changeData map[string]string) {
	messageBody, err := json.Marshal(changeData)
	if err != nil {
		log.Printf("SNS_PUBLISH_ERROR: Failed to marshal change data: %v", err)
		return
	}
	
	input := &sns.PublishInput{
		Message:  aws.String(string(messageBody)),
		TopicArn: aws.String(n.topicArn),
		MessageAttributes: map[string]types.MessageAttributeValue{
			"key": {
				DataType:    aws.String("String"),
				StringValue: aws.String(changeData["key"]),
			},
		},
	}
	
	result, err := n.client.Publish(ctx, input)
	if err != nil {
		log.Printf("SNS_PUBLISH_ERROR: Failed to publish message: %v", err)
		return
	}
	
	log.Printf("SNS_PUBLISH_SUCCESS: Message ID %s published for key %s", *result.MessageId, changeData["key"])
}

现在,任何对特性开关的修改都会被发布出去。其他微服务只需订阅这个SNS Topic(通常是通过一个SQS队列),就能实时接收到变更事件,彻底消除了轮询的需要。这个改动极大地改善了整个系统的架构。

graph TD
    subgraph "Clients"
        ServiceA --- SQS1
        ServiceB --- SQS2
    end

    subgraph "AWS Cloud"
        SNSTopic[SNS Topic: feature-flag-changes] --> SQS1[SQS Queue for Service A]
        SNSTopic --> SQS2[SQS Queue for Service B]
    end
    
    subgraph "Feature Flag Service"
        Caddy[Caddy Reverse Proxy] --> Leader[Raft Leader]
        Caddy --> Follower1[Raft Follower]
        Caddy --> Follower2[Raft Follower]
    end

    Leader -- "1. HTTP POST /set" --> Caddy
    Leader -- "2. Replicate Log" --> Follower1
    Leader -- "2. Replicate Log" --> Follower2
    Leader -- "3. Commit & Apply" --> StateMachine[Internal State Machine]
    StateMachine -- "4. On Change (is Leader)" --> SNSTopic

    style Leader fill:#f9f,stroke:#333,stroke-width:2px

Sprint 7-8: Qwik驱动的管理仪表盘

命令行和API调用对于开发人员来说足够了,但对于产品经理和运维团队,一个直观的UI是必需品。我们决定构建一个简单的Web仪表盘来展示和修改所有特性开关。

我们选择了Qwik作为前端框架。这个选择看起来可能有些非主流,但它有一个杀手级特性:可恢复性(Resumability)。Qwik应用在服务器上完全渲染,并将所有状态和事件监听器序列化到HTML中。浏览器下载HTML后,无需执行任何JavaScript就能变得可交互。只有当用户实际与某个组件交互时,对应的极小块JavaScript才会被下载执行。对于一个内部仪表盘这种不常访问但每次打开都希望立即响应的场景,这简直是完美的。

我们创建了一个核心组件FeatureFlagDashboard

// file: src/components/feature-flag-dashboard.tsx

import { component$, useStore, useVisibleTask$ } from '@builder.io/qwik';

interface FeatureFlag {
  key: string;
  value: string;
}

interface Store {
  flags: FeatureFlag[];
  isLoading: boolean;
  error: string | null;
}

export const FeatureFlagDashboard = component$(() => {
  const store = useStore<Store>({
    flags: [],
    isLoading: true,
    error: null,
  });

  // useVisibleTask$ runs only on the client when the component becomes visible.
  // Perfect for fetching initial data or setting up connections.
  useVisibleTask$(({ cleanup }) => {
    const fetchFlags = async () => {
      try {
        store.isLoading = true;
        // The API endpoint is proxied by Caddy
        const response = await fetch('/api/v1/flags/all');
        if (!response.ok) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }
        const data = await response.json();
        store.flags = Object.entries(data).map(([key, value]) => ({ key, value: String(value) }));
      } catch (e: any) {
        store.error = e.message;
      } finally {
        store.isLoading = false;
      }
    };
    
    fetchFlags();

    // In a real application, we would set up a WebSocket connection here
    // to receive live updates pushed from the backend (relayed from SNS/SQS).
    // For this example, we'll stick to a manual refresh.
    const ws = new WebSocket('wss://feature-flags.internal.mycorp.com/ws');
    ws.onmessage = (event) => {
        const update = JSON.parse(event.data);
        console.log('Received update:', update);
        const index = store.flags.findIndex(f => f.key === update.key);
        if (index !== -1) {
            if (update.op === 'delete') {
                store.flags.splice(index, 1);
            } else {
                store.flags[index] = { key: update.key, value: update.value };
            }
        } else if (update.op === 'set') {
            store.flags.push({ key: update.key, value: update.value });
        }
    };

    ws.onerror = (err) => {
        console.error('WebSocket Error:', err);
        store.error = 'WebSocket connection failed.';
    }

    cleanup(() => ws.close());
  });

  return (
    <div>
      <h1>Feature Flags Dashboard</h1>
      {store.isLoading && <p>Loading...</p>}
      {store.error && <p style="color: red;">Error: {store.error}</p>}
      
      <table>
        <thead>
          <tr>
            <th>Key</th>
            <th>Value</th>
            <th>Actions</th>
          </tr>
        </thead>
        <tbody>
          {store.flags.map((flag) => (
            <tr key={flag.key}>
              <td>{flag.key}</td>
              <td>
                <input
                  type="text"
                  value={flag.value}
                  onChange$={async (event) => {
                    const newValue = (event.target as HTMLInputElement).value;
                    // Optimistic update
                    flag.value = newValue; 
                    // Send update to the backend
                    await fetch('/api/v1/flags/set', {
                      method: 'POST',
                      headers: {'Content-Type': 'application/json'},
                      body: JSON.stringify({ key: flag.key, value: newValue })
                    });
                  }}
                />
              </td>
              <td><button>Delete</button></td>
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
});

这个Qwik组件负责获取初始数据,并通过WebSocket(同样由Caddy代理)接收来自后端的实时更新。当用户修改输入框时,它会发起一个POST请求到我们的Raft服务API,触发整个Raft->SNS->WebSocket的更新流程,数据最终会实时地反馈到所有打开的仪表盘上。

当前方案的局限性与未来展望

我们通过几个Scrum迭代,成功地将RaftCaddyAWS SNSQwik这几个看似不相关的技术栈组合成一个高内聚的解决方案。它满足了最初对高可用、强一致性和实时通知的所有要求。

然而,这个系统远非完美。我们自己实现的Raft核心省略了许多生产级特性,最关键的是日志压缩(快照)集群成员变更。没有日志压缩,Raft日志会无限增长,最终耗尽磁盘空间。成员变更的缺失意味着我们无法动态地增删节点,扩缩容需要手动操作和停机。

此外,从Raft Leader经由AWS SNS再到前端的通知链路虽然解耦得很好,但延迟相对较高。对于需要更低延迟的场景,可能需要考虑在Raft节点上直接实现一个分发机制,绕过SNS。

未来的迭代计划将首先聚焦于实现Raft的快照功能,这是保证长期稳定运行的基石。其次是探索动态集群成员变更的实现,这将使我们的服务真正具备云原生时代的弹性。


  目录