构建BentoML与MyBatis异构服务间的高性能实时特征管道


模型团队交付了一个效果出色的梯度提升模型,用于实时用户意图预测。然而,一个棘手的工程问题摆在了面前:模型推理所需的数十个关键特征,分散在多个生产数据库表中,而访问这些数据的唯一合规路径,是公司内一个稳定运行多年的Java核心服务。该服务使用MyBatis作为其唯一的持久层技术栈。直接连接数据库是被严令禁止的,不仅出于安全策略,也因为这会绕开服务层封装的复杂业务逻辑和缓存策略。

我们的任务是用Python生态的BentoML将模型封装为服务,同时必须从这个异构的Java/MyBatis服务中实时、低延迟地拉取特征。初步评估显示,每次预测请求需要拉取约20个特征,对特征服务的P99延迟要求在50ms以内。这是一个典型的、在技术栈演进中遇到的“新旧系统”集成问题。

最初的构想是,在BentoML服务的推理逻辑中,直接通过HTTP请求Java服务暴露的某个通用用户数据接口。但这很快被否决。通用接口返回的数据结构臃肿,包含了大量模型不需要的字段,网络开销和序列化成本会直接拖垮延迟。更重要的是,它并非为高并发、低延迟的特征查询场景设计。我们需要一个专用的、高性能的特征管道。

最终方案确定为:

  1. 在Java服务侧,专门开辟一个轻量级的RESTful API端点,用于批量特征查询。此端点的MyBatis层实现必须经过高度优化,避免任何不必要的数据库开销。
  2. 在BentoML侧,创建一个专用的Runner,负责与Java特征服务通信。将IO密集型的网络调用与CPU密集型的模型推理隔离,是BentoML设计的最佳实践。
  3. 使用Docker和Docker Compose将这个异构系统(Python服务、Java服务、数据库)完整地容器化,构建一个可一键启动、隔离且一致的开发与测试环境。

这个方案的核心,是在两个不同的技术孤岛之间,建立一座专用的、高速的桥梁。

graph TD
    subgraph "客户端"
        A[Client Request]
    end

    subgraph "Python 环境 (BentoML Service)"
        B[BentoML API Server]
        C[FeatureEnrichmentRunner]
        D[ModelInferenceRunner]
    end

    subgraph "Java 环境 (Feature Service)"
        E[Spring Boot Controller]
        F[MyBatis Mapper]
    end

    subgraph "数据存储"
        G[PostgreSQL Database]
    end

    A -- user_id --> B
    B -- user_id --> C
    C -- "HTTP GET /features?userIds=..." --> E
    E -- "调用" --> F
    F -- "SQL Query" --> G
    G -- "返回特征数据" --> F
    F -- "映射为DTO" --> E
    E -- "JSON Response" --> C
    C -- "返回特征向量" --> B
    B -- "特征向量" --> D
    D -- "执行模型推理" --> B
    B -- "预测结果" --> A

第一步:加固桥墩 - 优化Java特征服务与MyBatis查询

一切的起点是Java服务。如果它不能稳定、快速地提供数据,上游的BentoML无论如何优化都是徒劳。我们使用Spring Boot来快速构建这个专用的API端点。

1. 定义清晰的数据契约 (DTO)

在真实项目中,我们绝不能直接将MyBatis查询出的Domain Object或Map暴露给API。定义一个专用的数据传输对象(DTO)是必须的,它能精确控制暴露的字段,避免不必要的序列化开销,并作为服务间稳固的契约。

FeatureDTO.java:

package com.example.featureservice.dto;

import java.math.BigDecimal;
import java.time.Instant;

// DTO (Data Transfer Object)
// 只包含模型推理所必需的字段,命名清晰,类型明确。
// 相比于直接暴露数据库实体,DTO提供了更好的封装和隔离,
// 避免了不必要的数据传输和内部实现细节的泄露。
public class FeatureDTO {

    private String userId;
    private Long recentLogins; // 最近7天登录次数
    private BigDecimal totalOrderValue; // 历史总订单金额
    private Instant lastPurchaseAt; // 最后一次购买时间
    private Integer itemsInCart; // 购物车中商品数量

    // Getters and Setters ...

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Long getRecentLogins() {
        return recentLogins;
    }

    public void setRecentLogins(Long recentLogins) {
        this.recentLogins = recentLogins;
    }

    public BigDecimal getTotalOrderValue() {
        return totalOrderValue;
    }

    public void setTotalOrderValue(BigDecimal totalOrderValue) {
        this.totalOrderValue = totalOrderValue;
    }

    public Instant getLastPurchaseAt() {
        return lastPurchaseAt;
    }

    public void setLastPurchaseAt(Instant lastPurchaseAt) {
        this.lastPurchaseAt = lastPurchaseAt;
    }

    public Integer getItemsInCart() {
        return itemsInCart;
    }

    public void setItemsInCart(Integer itemsInCart) {
        this.itemsInCart = itemsInCart;
    }
}

2. 设计高性能的Controller端点

这个端点必须支持批量查询。在机器学习场景中,我们常常需要一次性为一个小批次(mini-batch)的请求同时获取特征。通过批量查询,可以极大地减少网络往返次数和数据库连接开销。

FeatureController.java:

package com.example.featureservice.controller;

import com.example.featureservice.dto.FeatureDTO;
import com.example.featureservice.service.FeatureService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Collections;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/features")
public class FeatureController {

    private static final Logger logger = LoggerFactory.getLogger(FeatureController.class);

    private final FeatureService featureService;

    public FeatureController(FeatureService featureService) {
        this.featureService = featureService;
    }

    // 支持批量查询,使用 List<String> userIds 作为参数
    // 这是一个关键的设计决策,避免了BentoML侧的N次调用,将其合并为1次。
    @GetMapping
    public ResponseEntity<Map<String, FeatureDTO>> getFeaturesForUsers(@RequestParam List<String> userIds) {
        if (userIds == null || userIds.isEmpty()) {
            return ResponseEntity.badRequest().body(Collections.emptyMap());
        }
        
        // 添加一个硬性限制,防止一次请求过多ID导致数据库压力过大。
        // 在生产环境中,这个值需要根据数据库性能和业务场景仔细调整。
        if (userIds.size() > 100) {
            logger.warn("Request for userIds exceeds limit of 100. Actual size: {}", userIds.size());
            return ResponseEntity.badRequest().body(Collections.emptyMap());
        }

        try {
            long startTime = System.nanoTime();
            Map<String, FeatureDTO> features = featureService.getFeaturesForUsers(userIds);
            long duration = (System.nanoTime() - startTime) / 1_000_000; // ms
            logger.info("Successfully fetched features for {} users in {} ms.", features.size(), duration);
            return ResponseEntity.ok(features);
        } catch (Exception e) {
            // 详细的错误日志对于排查问题至关重要
            logger.error("Error fetching features for userIds: {}", userIds, e);
            // 不向客户端暴露内部错误细节
            return ResponseEntity.internalServerError().build();
        }
    }
}

3. 编写高效的MyBatis Mapper

这是性能的关键。一个常见的错误是循环调用单用户查询。正确的方式是使用IN子句,将多次查询合并为一次。MyBatis的foreach标签是实现这一点的完美工具。

UserFeatureMapper.java (Interface):

package com.example.featureservice.mapper;

import com.example.featureservice.dto.FeatureDTO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;

@Mapper
public interface UserFeatureMapper {

    /**
     * 为一组用户ID批量检索特征。
     * @param userIds 用户ID列表
     * @return FeatureDTO列表
     */
    List<FeatureDTO> findFeaturesByUserIds(@Param("userIds") List<String> userIds);
}

UserFeatureMapper.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.featureservice.mapper.UserFeatureMapper">

    <resultMap id="FeatureDTOResultMap" type="com.example.featureservice.dto.FeatureDTO">
        <id property="userId" column="user_id"/>
        <result property="recentLogins" column="recent_logins"/>
        <result property="totalOrderValue" column="total_order_value"/>
        <result property="lastPurchaseAt" column="last_purchase_at"/>
        <result property="itemsInCart" column="items_in_cart"/>
    </resultMap>

    <!--
        这是整个Java服务侧性能的核心。
        1. 使用`foreach`标签构建 `IN` 子句,这是批量查询的标准做法。
           `collection="userIds"` 对应Java方法参数中的@Param("userIds")。
           `item="userId"` 是循环中的变量名。
           `open="(" separator="," close=")"` 生成 `('id1', 'id2', ...)` 的SQL片段。
        2. SQL查询本身应该是高效的。这里的例子是一个跨多表的聚合查询,
           在生产环境中,这个查询关联的表(users, user_logins, orders, shopping_carts)
           必须在关联键(user_id)和过滤条件(login_time)上有正确的索引。
           否则,随着数据量增长,这里会成为性能瓶颈。
    -->
    <select id="findFeaturesByUserIds" resultMap="FeatureDTOResultMap">
        SELECT
            u.user_id,
            COALESCE(l.login_count, 0) AS recent_logins,
            COALESCE(o.total_value, 0.00) AS total_order_value,
            o.max_purchase_time AS last_purchase_at,
            COALESCE(c.item_count, 0) AS items_in_cart
        FROM
            users u
        LEFT JOIN (
            SELECT user_id, COUNT(*) AS login_count
            FROM user_logins
            WHERE login_time >= NOW() - INTERVAL '7 day'
            GROUP BY user_id
        ) l ON u.user_id = l.user_id
        LEFT JOIN (
            SELECT user_id, SUM(order_value) AS total_value, MAX(created_at) AS max_purchase_time
            FROM orders
            GROUP BY user_id
        ) o ON u.user_id = o.user_id
        LEFT JOIN (
            SELECT user_id, COUNT(*) AS item_count
            FROM shopping_carts
            GROUP BY user_id
        ) c ON u.user_id = c.user_id
        WHERE u.user_id IN
        <foreach item="userId" collection="userIds" open="(" separator="," close=")">
            #{userId}
        </foreach>
    </select>
</mapper>

第二步:架设桥面 - 实现BentoML特征获取Runner

在BentoML侧,我们不能将网络请求逻辑直接写在API处理函数中。BentoML的Runner机制可以将计算任务(无论是IO密集型还是CPU密集型)抽象出来,进行独立的扩展和资源配置。

1. 定义特征获取Runner

这个Runner的核心职责是:接收用户ID,调用Java API,处理响应,并将其转换为模型需要的格式。使用httpx的异步客户端是现代Python网络编程的首选,它能很好地与BentoML的异步服务集成。

feature_fetcher.py:

import typing as t
import asyncio
import bentoml
import httpx
from bentoml.io import Text, JSON

# 在真实项目中,URL应该来自配置管理,而不是硬编码。
# "java-feature-service" 是我们在Docker Compose中定义的服务名。
# Docker的内部DNS会将其解析到Java容器的IP地址。
FEATURE_SERVICE_URL = "http://java-feature-service:8080/features"

# 定义Runner的输入和输出类型
FeatureDict = t.Dict[str, t.Union[str, int, float, None]]

# 定义一个自定义的异常,用于更好地处理网络或服务端的错误
class FeatureFetchError(Exception):
    pass

@bentoml.service_runner
class FeatureFetcherRunner:
    def __init__(self):
        # 初始化一个异步HTTP客户端。
        # 将其放在__init__中可以重用连接,避免为每个请求创建新客户端的开销。
        # timeout设置为5秒,防止下游服务延迟过高导致整个请求卡死。
        self.client = httpx.AsyncClient(timeout=5.0)
        bentoml.logger.info("FeatureFetcherRunner initialized with HTTP client.")

    @bentoml.runner_method(batchable=True, batch_dim=0)
    async def fetch_features(self, user_ids: t.List[str]) -> t.List[FeatureDict]:
        """
        这个方法被标记为 batchable=True,这是BentoML性能优化的关键。
        BentoML的API服务器会收集在短时间内到达的多个请求,
        将它们的user_ids合并成一个列表,然后一次性调用这个方法。
        这完美地匹配了我们Java API的批量查询能力。
        """
        if not user_ids:
            return []
        
        bentoml.logger.debug(f"Fetching features for batch of size {len(user_ids)}")
        
        try:
            # 构建请求参数
            params = [("userIds", user_id) for user_id in user_ids]
            response = await self.client.get(FEATURE_SERVICE_URL, params=params)
            response.raise_for_status() # 如果HTTP状态码是4xx或5xx,则抛出异常
            
            data: t.Dict[str, t.Dict] = response.json()

            # 将Java服务返回的Map结构重新整理成与输入user_ids顺序一致的列表
            # 这是非常重要的一步,因为下游的模型推理Runner期望输入和输出的顺序是一致的。
            # 如果某个用户没有特征,我们提供一个默认的空字典。
            results = [data.get(user_id, {}) for user_id in user_ids]
            return results
            
        except httpx.HTTPStatusError as e:
            bentoml.logger.error(f"HTTP error fetching features: {e.response.status_code} - {e.response.text}")
            raise FeatureFetchError(f"Feature service returned status {e.response.status_code}") from e
        except httpx.RequestError as e:
            bentoml.logger.error(f"Network error fetching features: {e}")
            raise FeatureFetchError(f"Network error connecting to feature service") from e
        except Exception as e:
            bentoml.logger.error(f"An unexpected error occurred in FeatureFetcherRunner: {e}")
            raise

2. 编排服务逻辑

现在,我们定义BentoML的主服务,它将协调特征获取Runner和(一个模拟的)模型推理Runner。

service.py:

import bentoml
import numpy as np
from bentoml.io import JSON

# 导入我们刚刚定义的Runner
from feature_fetcher import FeatureFetcherRunner, FeatureDict

# 假设我们有一个模型Runner,这里用一个简单的实现来模拟
# 在实际项目中,它会加载一个真实的模型(如scikit-learn, xgboost等)
@bentoml.service_runner
class MockModelRunner:
    @bentoml.runner_method(batchable=True, batch_dim=0)
    def predict(self, features: t.List[FeatureDict]) -> np.ndarray:
        # 简单的模拟逻辑:基于特征数量进行预测
        predictions = [len(f) for f in features]
        return np.array(predictions)

# 注入Runner实例
feature_fetcher_runner = bentoml.Runner(FeatureFetcherRunner, name="feature_fetcher")
model_runner = bentoml.Runner(MockModelRunner, name="mock_model_runner")

# 定义BentoML服务
# workers=-1 会让BentoML根据CPU核心数自动设置worker数量
svc = bentoml.Service(
    "realtime_feature_pipeline", 
    runners=[feature_fetcher_runner, model_runner]
)

# 定义API输入的数据结构
class UserRequest(bentoml.Schema):
    user_id: str

@svc.api(input=JSON(schema=UserRequest), output=JSON())
async def predict(request: UserRequest):
    # API层接收单个请求
    user_id = request.user_id
    
    # 调用特征Runner。即使这里只传递一个ID,BentoML的动态批处理机制
    # 也会在后台将其与其他并发请求合并,然后批量发送给Runner。
    feature_dict = await feature_fetcher_runner.fetch_features.async_run([user_id])
    
    # Runner的批量方法返回的是列表,我们需要取第一个元素
    if not feature_dict:
        return {"user_id": user_id, "error": "Failed to fetch features"}
        
    # 调用模型Runner进行推理
    prediction_result = await model_runner.predict.async_run([feature_dict[0]])
    
    return {
        "user_id": user_id,
        "prediction": prediction_result[0].item(), # .item() 从numpy类型转为python原生类型
        "features_used": feature_dict[0]
    }

第三步:连接桥梁 - 使用Docker Compose集成所有组件

现在,我们需要一种方法来可靠地启动和连接这三个组件:BentoML服务、Java特征服务和一个PostgreSQL数据库。Docker Compose是实现这一目标的理想工具。

1. Java服务的Dockerfile

使用多阶段构建可以显著减小最终镜像的体积,这对于生产部署至关重要。

feature-service/Dockerfile:

# --- Build Stage ---
FROM maven:3.8.4-openjdk-17 AS build
WORKDIR /app
COPY pom.xml .
COPY src ./src
# -DskipTests to speed up the build process in CI/CD
RUN mvn clean package -DskipTests

# --- Runtime Stage ---
FROM openjdk:17-slim
WORKDIR /app
# 从构建阶段复制最终的jar包
COPY --from=build /app/target/*.jar app.jar
EXPOSE 8080
# 设置JVM参数是一个好习惯,特别是在容器环境中
# -XX:+UseContainerSupport 让JVM能正确识别容器的内存限制
ENTRYPOINT ["java", "-XX:+UseContainerSupport", "-jar", "app.jar"]

2. BentoML服务的bentofile.yaml

这是BentoML项目的配置文件,定义了服务、依赖和Docker构建选项。

bentofile.yaml:

service: "service:svc"
labels:
  owner: ml-platform-team
  project: real-time-pipeline
include:
  - "*.py"
python:
  packages:
    - bentoml
    - httpx
    - numpy
docker:
  distro: debian
  python_version: "3.9"

3. 顶层docker-compose.yml

这个文件是整个系统的启动蓝图。它定义了服务、它们之间的依赖关系以及网络。

docker-compose.yml:

version: '3.8'

services:
  # 1. 数据库服务
  db:
    image: postgres:13
    container_name: feature_db
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=feature_store
    ports:
      - "5432:5432"
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d feature_store"]
      interval: 5s
      timeout: 5s
      retries: 5

  # 2. Java特征服务
  java-feature-service:
    build:
      context: ./feature-service # 指向Java项目的目录
    container_name: java_feature_service
    ports:
      - "8080:8080"
    environment:
      # Spring Boot的数据库配置,指向db服务
      - SPRING_DATASOURCE_URL=jdbc:postgresql://db:5432/feature_store
      - SPRING_DATASOURCE_USERNAME=user
      - SPRING_DATASOURCE_PASSWORD=password
    depends_on:
      db: # 确保在Java服务启动前,数据库已准备就绪
        condition: service_healthy

  # 3. Python BentoML服务
  bentoml-prediction-service:
    build:
      context: ./bentoml-service # 指向BentoML项目的目录
      # BentoML的 `bentoml build` 命令会生成一个Dockerfile,我们在这里引用它
      dockerfile: Dockerfile 
    container_name: bentoml_prediction_service
    ports:
      - "3000:3000"
    depends_on:
      - java-feature-service
    environment:
      # 这个环境变量可以被BentoML代码读取,虽然我们在代码中硬编码了URL
      # 但更好的实践是通过环境变量配置
      - FEATURE_SERVICE_URL=http://java-feature-service:8080/features

为了让这个Compose文件能工作,我们还需要一个init.sql来初始化数据库表和一些种子数据。

init.sql:

CREATE TABLE users (user_id VARCHAR(50) PRIMARY KEY, created_at TIMESTAMPTZ DEFAULT NOW());
CREATE TABLE user_logins (login_id SERIAL PRIMARY KEY, user_id VARCHAR(50), login_time TIMESTAMPTZ DEFAULT NOW());
CREATE TABLE orders (order_id SERIAL PRIMARY KEY, user_id VARCHAR(50), order_value NUMERIC(10, 2), created_at TIMESTAMPTZ DEFAULT NOW());
CREATE TABLE shopping_carts (item_id SERIAL PRIMARY KEY, user_id VARCHAR(50), item_name VARCHAR(100));

INSERT INTO users (user_id) VALUES ('user123'), ('user456');
INSERT INTO user_logins (user_id) VALUES ('user123'), ('user123'), ('user456');
INSERT INTO orders (user_id, order_value) VALUES ('user123', 99.99), ('user456', 25.50), ('user456', 120.00);
INSERT INTO shopping_carts (user_id, item_name) VALUES ('user123', 'itemA');

在BentoML项目目录下运行bentoml build会生成一个包含Dockerfile的bentoml_service目录。之后,在项目根目录运行docker-compose up --build,整个异构系统就会启动。我们可以通过向http://localhost:3000/predict发送请求来测试整个管道。

# 测试请求
curl -X POST \
  http://localhost:3000/predict \
  -H 'Content-Type: application/json' \
  -d '{"user_id": "user123"}'

# 预期响应
# {
#  "user_id": "user123",
#  "prediction": 4, 
#  "features_used": {
#    "userId": "user123",
#    "recentLogins": 2,
#    "totalOrderValue": 99.99,
#    "lastPurchaseAt": "...",
#    "itemsInCart": 1
#  }
# }

方案的局限性与未来迭代路径

这套基于HTTP REST和批量查询的方案,作为连接异构系统的务实第一步是有效的。它清晰、易于实现且便于调试。然而,它的局限性也很明显。

首先,REST/JSON的组合带来了不可忽视的序列化和网络开销。对于P99要求在个位数毫秒的极端场景,这可能会成为瓶颈。一个自然的演进方向是采用gRPC。通过Protobuf进行二进制序列化,并利用HTTP/2的多路复用,可以显著降低通信延迟。但这会增加开发复杂性,尤其是在Java和Python之间维护.proto契约。

其次,当前的架构中,Java特征服务是一个单点。尽管可以通过水平扩展部署多个实例来提高吞吐量和可用性,但它依然是整个预测流程中的一个强依赖。长远来看,更健壮的架构可能会引入一个专门的、独立于核心Java应用的“在线特征存储”(Online Feature Store),例如Feast、Redis或Tair。数据可以通过CDC(变更数据捕获)从主数据库实时同步到特征存储中,这样ML服务就可以直接高速访问特征数据,与核心Java服务完全解耦。

最后,跨语言服务的可观测性是一个挑战。当一次请求失败时,问题可能出在BentoML、Java服务、MyBatis查询或网络层面。必须建立一套统一的分布式追踪体系,例如使用OpenTelemetry,将Trace ID从BentoML服务一直传递到Java服务内部,才能快速定位故障点。


  目录