构建基于Go与OpenFaaS的混合推荐架构:平衡实时特征计算与列式存储查询成本


为应对突发性、非周期性的推荐请求,维持一个庞大的、永远在线的微服务集群,其闲置资源成本在真实项目中是难以接受的。特别是在用户行为稀疏、请求量两极分化严重的场景,传统架构要么为了应对峰值而过度配置,要么在流量洪峰来临时服务崩溃。核心矛盾在于:如何为冷启动用户或需要复杂实时特征的用户,在不牺牲过多响应时间的前提下,提供一个成本可控的计算模型。

传统的全量预计算方案无法捕捉用户的即时兴趣,而纯粹的实时计算服务则意味着巨大的固定资源开销。本文旨在探讨一种混合架构,试图在这两者之间找到一个工程上的平衡点。

定义问题:成本与延迟的权衡

一个推荐系统的核心是在毫秒级内返回个性化结果。这通常依赖于预先计算好的用户画像和物品向量。但当一个新用户出现,或者一个老用户的行为模式需要基于最近几秒的活动进行重新评估时,问题就出现了。

方案A:传统常驻型微服务架构

这是最直接的方案。一个或多个Go编写的微服务实例持续运行,通过负载均衡器接收请求。服务内部维护一个热数据缓存(如Redis),用于存储高频访问的用户特征。

  • 优势:

    1. 低延迟: 对于缓存命中的请求,响应极快。
    2. 状态管理: 服务是有状态的,可以方便地管理连接池、本地缓存等。
    3. 成熟生态: 部署、监控、日志等工具链非常成熟。
  • 劣势:

    1. 高昂的闲置成本: 无论有无请求,计算资源(CPU、内存)都必须被占用。对于一个拥有百万级但日活仅万级的应用,这意味着99%的时间资源都在空转。
    2. 容量规划复杂: 必须为预期的峰值流量进行容量规划,这通常导致资源的巨大浪费。自动伸缩(Autoscaling)可以缓解,但反应速度和伸缩粒度仍是问题。
    3. 部署笨重: 即便是一个小小的逻辑变更,也需要整个服务的滚动更新。

方案B:纯粹的Serverless架构

将整个推荐逻辑封装到一个函数中,由OpenFaaS这类平台管理。每个请求对应一次函数调用。

  • 优势:

    1. 极致的成本效益: 按需付费,无请求则无成本,完美契合稀疏请求模式。
    2. 自动伸缩: 平台自动处理从零到N的扩缩容,无需人工干预。
    3. 部署敏捷: 函数级别的独立部署和版本控制。
  • 劣势:

    1. 冷启动延迟: 这是Serverless的致命弱点。对于推荐系统这种对延迟敏感的应用,数百毫秒甚至秒级的冷启动时间是不可接受的。这包括了容器启动、代码初始化、数据库连接建立等一系列开销。
    2. 无状态限制: 函数实例通常是无状态、短生命周期的,难以维护数据库连接池等长效资源,每次调用都可能产生新的连接开销。
    3. 执行时间限制: 大多数Serverless平台对单次函数执行有时长限制,复杂的模型推理或特征计算可能超时。

最终选择:混合式架构——“看门狗”与“计算专家”

两种方案的优劣势都非常明显。一个务实的选择是融合二者,构建一个混合模型。

该架构由两部分组成:

  1. Go Gatekeeper (看门狗服务): 一个轻量级的、永远在线的Go微服务。它的职责非常单一:处理所有入口请求,管理高速缓存,并决定计算负载的路由。
  2. OpenFaaS Workers (计算专家函数): 一组部署在OpenFaaS上的Go函数。它们不直接面向用户,而是作为后台的计算单元,负责处理计算密集型的任务,如实时特征工程和模型推理。

工作流程如下:

sequenceDiagram
    participant User as 用户
    participant LB as 负载均衡器
    participant Gatekeeper as Go看门狗服务
    participant Cache as 高速缓存 (Redis)
    participant OpenFaaS as OpenFaaS网关
    participant Worker as 特征计算函数
    participant ColumnarDB as 列式数据库 (ClickHouse)

    User->>LB: GET /recommendations?user_id=123
    LB->>Gatekeeper: 转发请求
    Gatekeeper->>Cache: 查询用户特征 (user:123:features)
    alt 缓存命中
        Cache-->>Gatekeeper: 返回已计算的特征
        Gatekeeper-->>User: 基于缓存特征生成推荐并返回
    else 缓存未命中或过期
        Gatekeeper->>OpenFaaS: (异步)调用特征计算函数 process-user-features
        Gatekeeper-->>User: 返回默认或降级推荐结果
        OpenFaaS->>Worker: 触发函数执行
        Worker->>ColumnarDB: 查询用户近期行为日志
        ColumnarDB-->>Worker: 返回原始事件流
        Worker->>Worker: 执行复杂的特征计算
        Worker->>Cache: 将新特征写入缓存 (TTL=10m)
    end

架构决策理由:

  • 平衡成本与延迟: 常驻的Gatekeeper服务资源占用极小,只负责IO密集型的缓存读写和请求分发。计算密集型任务被卸载到可弹性伸缩至零的OpenFaaS函数中,极大地降低了闲置成本。
  • 优化用户体验: 对于缓存未命中的“冷”用户,系统可以立即返回一个通用的、非个性化的推荐结果,避免了用户长时间的等待。同时,后台异步触发的计算任务会“预热”该用户的缓存,使其下一次请求能获得高质量的个性化推荐。这种“最终个性化”的策略在许多场景下是完全可以接受的。
  • 技术栈协同:
    • Go: 极其适合编写高并发、低内存占用的Gatekeeper服务。其强大的标准库和并发模型能轻松处理大量网络连接。
    • OpenFaaS + Podman: OpenFaaS提供了成熟的Serverless平台能力。选择Podman作为其底层容器引擎,是出于其无守护进程(daemonless)的架构。在某些环境中,这能提供更好的安全隔离,并且在CI/CD流水线中作为一次性命令执行时,比需要启动守护进程的Docker更轻量。
    • 列式NoSQL (ClickHouse): 实时特征计算通常需要对用户的历史行为流(点击、购买、浏览)进行聚合分析。这类GROUP BY, COUNT, AVG操作是典型的OLAP查询。在行式数据库(如MySQL)上对海量日志数据执行这类查询是灾难性的,而列式数据库(如ClickHouse)为此而生,能够以亚秒级速度扫描数亿行数据完成聚合,是“计算专家”函数理想的数据源。

核心实现概览

1. Go Gatekeeper服务

这个服务的核心是HTTP服务器和缓存客户端逻辑。

main.go:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/go-redis/redis/v8"
	"github.com/openfaas/faas-cli/proxy"
)

var (
	redisClient    *redis.Client
	faasGatewayURL string
	faasUsername   string
	faasPassword   string
)

const (
	cacheKeyPrefix = "user:features:"
	cacheTTL       = 10 * time.Minute
	functionName   = "realtime-feature-builder"
)

type UserFeatures struct {
	UserID         string    `json:"user_id"`
	ComputedAt     time.Time `json:"computed_at"`
	ClickThroughRate float64 `json:"click_through_rate"`
	RecentCategories []string  `json:"recent_categories"`
	// ... 更多特征
}

// init initializes connections and configurations
func init() {
	redisAddr := getEnv("REDIS_ADDR", "localhost:6379")
	faasGatewayURL = getEnv("OPENFAAS_URL", "http://127.0.0.1:8080")
	
    // 生产环境中,密码应从Kubernetes Secret或Vault中获取
	faasUsername = getEnv("OPENFAAS_USERNAME", "admin")
	faasPassword = getEnv("OPENFAAS_PASSWORD", "") // 读取密码的逻辑

	redisClient = redis.NewClient(&redis.Options{
		Addr: redisAddr,
	})

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if _, err := redisClient.Ping(ctx).Result(); err != nil {
		log.Fatalf("Failed to connect to Redis: %v", err)
	}
	log.Println("Successfully connected to Redis.")
}


func main() {
	http.HandleFunc("/recommendations", recommendationHandler)
	log.Println("Starting Gatekeeper service on :8081")
	if err := http.ListenAndServe(":8081", nil); err != nil {
		log.Fatalf("Server failed to start: %v", err)
	}
}

// recommendationHandler is the main entry point for recommendation requests.
func recommendationHandler(w http.ResponseWriter, r *http.Request) {
	userID := r.URL.Query().Get("user_id")
	if userID == "" {
		http.Error(w, "user_id is required", http.StatusBadRequest)
		return
	}

	ctx := r.Context()
	cacheKey := cacheKeyPrefix + userID

	// 1. 尝试从缓存获取特征
	val, err := redisClient.Get(ctx, cacheKey).Result()
	if err == nil {
		// 缓存命中
		var features UserFeatures
		if err := json.Unmarshal([]byte(val), &features); err == nil {
			log.Printf("Cache hit for user %s. Features computed at %s", userID, features.ComputedAt)
			// 基于特征生成推荐 (此部分逻辑省略)
			respondWithJSON(w, http.StatusOK, map[string]string{"status": "ok", "source": "cache", "user_id": userID})
			return
		}
	}
    
    // 2. 缓存未命中或反序列化失败
	log.Printf("Cache miss for user %s. Triggering async feature computation.", userID)

	// 异步调用OpenFaaS函数
	go triggerFeatureComputation(userID)

	// 3. 立即返回降级结果
	respondWithJSON(w, http.StatusOK, map[string]string{"status": "ok", "source": "fallback", "user_id": userID, "message": "Personalized recommendations are being prepared."})
}

// triggerFeatureComputation calls the OpenFaaS function asynchronously.
func triggerFeatureComputation(userID string) {
	// 在生产环境中,这里应该使用更健壮的异步任务队列(如NATS, Kafka)
    // 而不是简单的goroutine,以保证任务不丢失。
	// 这里为了简化,直接使用goroutine和OpenFaaS的async endpoint
	
    // 注意:使用/async-function/会立即返回,函数在后台执行
	asyncURL := fmt.Sprintf("%s/async-function/%s", faasGatewayURL, functionName)

	cli, err := proxy.NewClient(faasUsername, faasPassword, faasGatewayURL, nil, &proxy.ClientAuth{}, 5*time.Second)
	if err != nil {
		log.Printf("[ERROR] Failed to create OpenFaaS client: %v", err)
		return
	}

	// 构造函数调用的 payload
	payload := []byte(fmt.Sprintf(`{"user_id": "%s"}`, userID))

	// 使用InvokeAsync进行异步调用
	// OpenFaaS的异步调用需要配置NATS等消息队列
	resp, err := cli.InvokeAsync(functionName, &payload, nil)
	if err != nil {
		log.Printf("[ERROR] Failed to invoke function '%s' for user %s: %v", functionName, userID, err)
		return
	}

	log.Printf("Successfully triggered function '%s' for user %s. Status code: %d", functionName, userID, resp.StatusCode)
}


func getEnv(key, fallback string) string {
	if value, ok := os.LookupEnv(key); ok {
		return value
	}
	return fallback
}

func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
	response, _ := json.Marshal(payload)
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(code)
	w.Write(response)
}

单元测试思路:

  • recommendationHandler: 模拟HTTP请求,使用httptest包。
    • Case 1: redis.Client返回一个有效值,验证响应是否来自”cache”。
    • Case 2: redis.Client返回redis.Nil,验证响应是否是”fallback”,并验证triggerFeatureComputation是否被调用(可通过mock或channel通信来验证)。
    • Case 3: 请求缺少user_id,验证是否返回400错误。
  • triggerFeatureComputation: 需mock proxy.Client接口,验证InvokeAsync是否被以正确的参数调用。

2. OpenFaaS 特征计算函数

这个函数是计算核心。它接收用户ID,查询ClickHouse,计算特征,然后回写到Redis。

首先,定义 stack.yml 来描述函数。

stack.yml:

version: 1.0
provider:
  name: openfaas
  gateway: http://127.0.0.1:8080

functions:
  realtime-feature-builder:
    lang: go
    handler: ./realtime-feature-builder
    image: your-docker-repo/realtime-feature-builder:latest
    environment:
      # 在生产环境中,这些敏感信息应通过OpenFaaS secrets挂载
      CLICKHOUSE_ADDR: "clickhouse-server:9000"
      CLICKHOUSE_USER: "default"
      CLICKHOUSE_DB: "logs"
      REDIS_ADDR: "redis-master:6379"
    secrets:
      - clickhouse-password
      - redis-password
    labels:
        com.openfaas.scale.min: "0" # 允许缩容至0
        com.openfaas.scale.max: "20" # 最大实例数
    requests:
      cpu: 100m
      memory: 128Mi
    limits:
      cpu: 500m
      memory: 256Mi
  • 这里的com.openfaas.scale.min: "0"是实现成本控制的关键。

函数代码 realtime-feature-builder/handler.go:

package function

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/ClickHouse/clickhouse-go"
	"github.comcom/go-redis/redis/v8"
)

var (
	chClient    *sql.DB
	redisClient *redis.Client
)

type RequestPayload struct {
	UserID string `json:"user_id"`
}

type UserFeatures struct {
	UserID           string    `json:"user_id"`
	ComputedAt       time.Time `json:"computed_at"`
	TotalClicks      int64     `json:"total_clicks"`
	Last7DaysImpressions int64     `json:"last_7_days_impressions"`
	RecentCategories []string  `json:"recent_categories"`
}

// init a long-lived connection pool for the function instance.
// OpenFaaS会复用热函数的实例,所以init只会在冷启动时执行一次。
func init() {
	// ClickHouse connection
	chAddr := getEnv("CLICKHOUSE_ADDR", "localhost:9000")
	chUser := getEnv("CLICKHOUSE_USER", "default")
	chDb := getEnv("CLICKHOUSE_DB", "logs")
    // 从secrets读取密码
	chPasswordBytes, err := os.ReadFile("/var/openfaas/secrets/clickhouse-password")
	if err != nil {
		log.Fatalf("Failed to read clickhouse password secret: %v", err)
	}
	chPassword := string(chPasswordBytes)

	connect, err := sql.Open("clickhouse", fmt.Sprintf("tcp://%s?username=%s&password=%s&database=%s", chAddr, chUser, chPassword, chDb))
	if err != nil {
		log.Fatalf("Failed to connect to ClickHouse: %v", err)
	}
	if err := connect.Ping(); err != nil {
		if exception, ok := err.(*clickhouse.Exception); ok {
			log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
		}
		log.Fatalf("Ping to ClickHouse failed: %v", err)
	}
	chClient = connect
	log.Println("Successfully connected to ClickHouse.")

	// Redis connection
	redisAddr := getEnv("REDIS_ADDR", "localhost:6379")
	redisPasswordBytes, _ := os.ReadFile("/var/openfaas/secrets/redis-password")
	redisPassword := string(redisPasswordBytes)

	redisClient = redis.NewClient(&redis.Options{
		Addr:     redisAddr,
		Password: redisPassword,
	})

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if _, err := redisClient.Ping(ctx).Result(); err != nil {
		log.Fatalf("Failed to connect to Redis from function: %v", err)
	}
	log.Println("Successfully connected to Redis from function.")
}

// Handle a serverless request
func Handle(req []byte) string {
	var payload RequestPayload
	if err := json.Unmarshal(req, &payload); err != nil {
		return fmt.Sprintf("Error parsing request: %v", err)
	}
	if payload.UserID == "" {
		return "Error: user_id is missing from payload"
	}

	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // 函数执行超时
	defer cancel()

	// 1. 从ClickHouse计算特征
	features, err := computeFeatures(ctx, payload.UserID)
	if err != nil {
		log.Printf("[ERROR] Failed to compute features for user %s: %v", payload.UserID, err)
		return fmt.Sprintf("Error computing features: %v", err)
	}

	// 2. 将结果写入Redis缓存
	if err := writeFeaturesToCache(ctx, features); err != nil {
		log.Printf("[ERROR] Failed to write features to cache for user %s: %v", payload.UserID, err)
		return fmt.Sprintf("Error writing to cache: %v", err)
	}

	log.Printf("Successfully computed and cached features for user %s", payload.UserID)
	return "Success"
}

// computeFeatures queries ClickHouse to generate user features.
func computeFeatures(ctx context.Context, userID string) (*UserFeatures, error) {
	// 这条SQL是列式数据库的强项: 在海量数据上进行快速聚合
	query := `
		SELECT
			uniqExact(if(event_type = 'click', session_id, NULL)) as total_clicks,
			countIf(event_type = 'impression' AND event_timestamp >= now() - INTERVAL 7 DAY) as impressions_7d,
			groupArray(DISTINCT category)(10) as recent_categories
		FROM user_events
		WHERE user_id = ?
	`
	rows, err := chClient.QueryContext(ctx, query, userID)
	if err != nil {
		return nil, fmt.Errorf("clickhouse query failed: %w", err)
	}
	defer rows.Close()

	var features UserFeatures
	features.UserID = userID
	features.ComputedAt = time.Now()

	if rows.Next() {
		if err := rows.Scan(&features.TotalClicks, &features.Last7DaysImpressions, &features.RecentCategories); err != nil {
			return nil, fmt.Errorf("failed to scan query result: %w", err)
		}
	} else {
        return nil, fmt.Errorf("no data found for user %s", userID)
    }

	return &features, nil
}

// writeFeaturesToCache saves the computed features to Redis.
func writeFeaturesToCache(ctx context.Context, features *UserFeatures) error {
	cacheKey := "user:features:" + features.UserID
	cacheTTL := 10 * time.Minute

	data, err := json.Marshal(features)
	if err != nil {
		return fmt.Errorf("failed to marshal features: %w", err)
	}

	err = redisClient.Set(ctx, cacheKey, data, cacheTTL).Err()
	if err != nil {
		return fmt.Errorf("redis SET failed: %w", err)
	}
	return nil
}

func getEnv(key, fallback string) string {
	if value, ok := os.LookupEnv(key); ok {
		return value
	}
	return fallback
}

这里的坑在于init函数只在容器实例首次创建(冷启动)时运行。后续对同一实例的调用会复用已有的数据库连接池。这是提升热函数性能的关键,但也意味着必须妥善处理连接的健康状态。clickhouse-go驱动和Go的database/sql包会处理连接池和失效连接的重试,但在极端情况下,比如数据库重启,连接池中的所有连接都可能失效,需要有相应的重连和错误处理逻辑。

3. Podman与OpenFaaS的集成

OpenFaaS本身对容器运行时是解耦的。在Kubernetes环境(通过faas-netes部署)中,它默认使用集群配置的CRI(Container Runtime Interface),如containerdCRI-O。Podman通过CRI-O可以无缝对接到Kubernetes中。

对于更轻量的faasd(单机版OpenFaaS)部署,它默认使用containerd。但faasd也可以被配置为使用Podman。这需要在faasd的systemd服务配置中,通过--cni-plugin=...等参数指定网络和运行时配置,让faasd通过CRI-O与Podman交互。在实践中,这意味着为faasd的部署环境预先安装和配置好podmancrio,这比默认的containerd设置要多一些步骤,但可以换来一个完全无守护进程的Serverless运行时环境。

架构的扩展性与局限性

  • 扩展性:

    1. 多模型/多策略: 可以轻松部署新的特征计算函数(如realtime-feature-builder-v2),Gatekeeper通过请求头或参数路由到不同版本的函数,实现A/B测试。
    2. 批处理复用: realtime-feature-builder函数本身是无状态的,它不仅可以被实时请求触发,也可以被定时任务(如OpenFaaS的Cron Connector)批量触发,用于周期性地为活跃用户预热缓存,进一步降低实时请求的延迟。
    3. 事件驱动: Gatekeeper可以替换为消息队列的消费者。例如,用户行为日志进入Kafka后,一个消费者服务根据规则判断是否需要更新特征,然后异步调用OpenFaaS函数。这使架构更加事件驱动。
  • 局限性:

    1. 首次请求延迟: 这是该架构为成本效益付出的主要代价。对于一个完全冷的用户,首次请求必然得到降级结果。这个trade-off是否可以接受,完全取决于业务场景。
    2. 数据一致性: 缓存中的特征数据与底层数据库存在时间差。在函数计算完成并回写缓存之前,用户的请求会一直使用旧的(或没有)特征。这是一种最终一致性模型。
    3. 运维复杂性: 引入了OpenFaaS、列式数据库等新组件,相比单体或简单微服务,整个系统的运维、监控和调试链路变得更长。需要对分布式系统的可观测性(日志、追踪、指标)有足够的投入。
    4. 函数冷启动: 尽管我们将用户感知的延迟通过异步化隐藏了,但函数本身的冷启动问题依然存在。如果某个函数长时间未被调用,当它被触发时,从Podman拉取镜像、创建容器到代码初始化,仍可能耗时数秒。对于需要“准实时”更新缓存的场景,可能需要配置最小实例数(min-scale: 1),但这又部分牺牲了Serverless的成本优势。

  目录