构建一套由Java服务编排的混合式RAG数据管道


我们面临的挑战并非简单的信息检索。用户提交的查询是模糊的、上下文相关的,而知识库则分裂为两部分:一部分是存储在关系型数据库中高度结构化的实体数据,另一部分是散落在文档、报告和笔记中的海量非结构化文本。单纯的关键字匹配或全文搜索早已失效,而单一的向量搜索又无法精确利用已有的结构化信息。问题的核心是:如何构建一个查询系统,既能理解自然语言的语义,又能精确地利用结构化数据进行过滤和增强,最终将两者无缝融合。

方案权衡:一体化JVM方案 vs. 异构服务架构

一个直接的想法是将所有处理逻辑都限制在JVM生态系统内。我们可以使用像Apache OpenNLP或Stanford CoreNLP这样的Java库进行文本处理,利用DJL (Deep Java Library) 或类似框架加载嵌入模型进行向量化。

方案A:纯Java一体化架构

  • 优势:

    • 单一技术栈,简化了构建、部署和监控流程。
    • 无跨语言调用的网络开销,理论上降低了处理链路的延迟。
    • 事务管理和数据一致性控制相对直接。
  • 劣势:

    • NLP生态的局限性: 尽管Java的NLP库功能强大,但在模型更新速度、社区支持以及前沿算法的实现上,与Python生态(Hugging Face, spaCy, NLTK)存在显著差距。在真实项目中,快速迭代和验证最新的NLP模型至关重要。
    • 资源管理的耦合: NLP模型,特别是深度学习模型,对内存和可能的GPU资源有特殊要求。将其与提供API和业务逻辑的Web服务部署在同一进程中,会导致严重的资源竞争和隔离问题。一个内存泄漏或GPU驱动问题可能拖垮整个应用。
    • 维护与迭代的僵化: 将快速变化的AI模型与相对稳定的业务后端紧密耦合,违反了单一职责原则。每次模型更新都可能需要对整个单体服务进行完整的回归测试和重新部署,这在敏捷开发环境中是不可接受的。

方案B:Java核心编排 + Python NLP服务的异构架构

这个方案将系统职责明确分离。Java承担其最擅长的部分:构建稳定、高并发的后端服务、业务逻辑编排、数据持久化以及对外提供健壮的API。而将计算密集且依赖特定生态的NLP任务,则剥离到一个独立的Python服务中。

  • 优势:

    • 技术栈专业化: 为每个任务选择最合适的工具。Java用于构建企业级后端,Python用于利用其顶级的机器学习和NLP生态系统。
    • 独立扩展与部署: NLP服务可以根据其独特的资源需求(例如,需要GPU实例)进行独立部署和扩展,而不会影响核心Java服务。反之亦然。
    • 容错与隔离: Python服务的故障或性能瓶颈被隔离,不会直接导致整个系统的崩溃。通过gRPC等框架的熔断和重试机制,可以构建更具韧性的系统。
  • 劣劣:

    • 架构复杂性增加: 引入了跨服务通信,需要处理网络延迟、序列化、服务发现和接口版本控制等问题。
    • 运维成本: 需要维护两个独立的服务、两套构建流水线和两套监控体系。

决策:

在真实项目中,迭代速度和模型效果往往是决定成败的关键。方案A的短期便利性无法弥补其长期在模型迭代和系统可维护性上的巨大缺陷。因此,我们选择方案B。其引入的架构复杂性是可控的,并且可以通过成熟的工具(如gRPC, Docker, Kubernetes)来有效管理。这是一种面向未来的投资,为系统的长期演进保留了最大的灵活性。

核心实现概览

我们的系统由几个关键部分组成:一个用于数据处理和向量化的Python NLP服务,一个作为系统大脑的Java编排服务,一个向量数据库Weaviate,一个GraphQL API网关,以及一个基于Solid.js的响应式前端。

graph TD
    subgraph "用户端"
        A[Solid.js UI]
    end

    subgraph "API层"
        B[GraphQL Gateway]
    end

    subgraph "核心后端 (Java)"
        C{Java Orchestration Service}
        D[PostgreSQL for Structured Data]
    end

    subgraph "AI/NLP服务 (Python)"
        E{Python NLP Service}
        F[spaCy for Text Processing]
        G[Sentence Transformers for Embeddings]
    end

    subgraph "数据存储"
        H[Weaviate Vector Database]
    end

    A -- GraphQL Query --> B
    B -- Resolves Query --> C
    C -- gRPC Call (Text Processing) --> E
    C -- JDBC --> D
    C -- Weaviate Client --> H
    E -- Uses --> F
    E -- Uses --> G
    H -- Stores/Retrieves --> Vectors & Payloads

数据管道:从非结构化文本到可查询向量

数据管道的质量直接决定了检索结果的上限。这里的核心是从原始文本中提取出高质量的、带有元数据的文本块(chunks),并将其转化为向量。

1. 通信契约:gRPC Proto定义

Java和Python服务之间的通信采用gRPC,因为它性能高、强类型,并且支持双向流。

nlp_service.proto

syntax = "proto3";

package com.yourcompany.nlp.grpc;

option java_multiple_files = true;
option java_package = "com.yourcompany.nlp.grpc";
option java_outer_classname = "NlpServiceProto";

// The service definition.
service NlpProcessingService {
  // Processes a raw document into structured chunks.
  rpc ProcessDocument (DocumentRequest) returns (DocumentResponse) {}
}

message DocumentRequest {
  string document_id = 1;
  string raw_text = 2;
  // Metadata like source, author, etc.
  map<string, string> metadata = 3;
}

message DocumentResponse {
  string document_id = 1;
  repeated Chunk chunks = 2;
  repeated NamedEntity entities = 3;
}

message Chunk {
  string chunk_id = 1;
  string text = 2;
  int32 start_char = 3;
  int32 end_char = 4;
}

message NamedEntity {
  string text = 1;
  string label = 2; // e.g., "PERSON", "ORG", "GPE"
  int32 start_char = 3;
  int32 end_char = 4;
}

2. Python NLP 服务实现

这个服务是数据处理的核心。它接收原始文本,使用spaCy进行分句和实体识别。spaCy的sentencizer组件比简单的基于标点符号的分割要鲁棒得多。

nlp_server.py

import grpc
from concurrent import futures
import spacy
from sentence_transformers import SentenceTransformer
# Import generated gRPC files
import nlp_service_pb2
import nlp_service_pb2_grpc

# It's crucial to load models once at startup, not per request.
# In a production setup, consider a more robust model loading strategy.
print("Loading spaCy model...")
NLP = spacy.load("en_core_web_sm")
print("Loading SentenceTransformer model...")
# This part is for a potential future use case where embedding is also done in Python.
# For now, we only focus on chunking and NER.
# MODEL = SentenceTransformer('all-MiniLM-L6-v2') 
print("Models loaded.")

class NlpProcessingServiceImpl(nlp_service_pb2_grpc.NlpProcessingServiceServicer):
    def ProcessDocument(self, request, context):
        """
        Processes raw text using spaCy for sentence chunking and named entity recognition.
        """
        raw_text = request.raw_text
        doc_id = request.document_id
        
        if not raw_text:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("Raw text cannot be empty.")
            return nlp_service_pb2.DocumentResponse()
            
        try:
            doc = NLP(raw_text)
            
            # Create chunks based on sentences
            chunks = []
            for i, sent in enumerate(doc.sents):
                chunk_id = f"{doc_id}-chunk-{i}"
                chunk = nlp_service_pb2.Chunk(
                    chunk_id=chunk_id,
                    text=sent.text.strip(),
                    start_char=sent.start_char,
                    end_char=sent.end_char
                )
                chunks.append(chunk)

            # Extract named entities from the whole document
            entities = []
            for ent in doc.ents:
                entity = nlp_service_pb2.NamedEntity(
                    text=ent.text,
                    label=ent.label_,
                    start_char=ent.start_char,
                    end_char=ent.end_char
                )
                entities.append(entity)

            return nlp_service_pb2.DocumentResponse(
                document_id=doc_id,
                chunks=chunks,
                entities=entities
            )
        except Exception as e:
            # Proper logging should be used here.
            print(f"Error processing document {doc_id}: {e}")
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f"An internal error occurred: {e}")
            return nlp_service_pb2.DocumentResponse()

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    nlp_service_pb2_grpc.add_NlpProcessingServiceServicer_to_server(NlpProcessingServiceImpl(), server)
    # Use a non-privileged port.
    server.add_insecure_port('[::]:50051')
    print("Starting gRPC server on port 50051...")
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

3. Java编排服务:调用与存储

Java服务负责读取数据源(例如,一个文件或数据库记录),调用Python服务,然后将处理好的数据和向量写入Weaviate。

WeaviateIngestionService.java

package com.yourcompany.rag.ingestion;

import com.yourcompany.nlp.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.v1.data.model.WeaviateObject;
import io.weaviate.client.v1.schema.model.Property;
import io.weaviate.client.v1.schema.model.WeaviateClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Service
public class DocumentIngestionService {

    private static final Logger logger = LoggerFactory.getLogger(DocumentIngestionService.class);
    private final WeaviateClient weaviateClient;
    private final NlpProcessingServiceGrpc.NlpProcessingServiceBlockingStub nlpClient;
    private final EmbeddingService embeddingService; // Assume this service handles vectorization

    @Autowired
    public DocumentIngestionService(WeaviateClient weaviateClient, EmbeddingService embeddingService) {
        this.weaviateClient = weaviateClient;
        this.embeddingService = embeddingService;

        // The channel should be managed as a bean in a real Spring application.
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        this.nlpClient = NlpProcessingServiceGrpc.newBlockingStub(channel)
                .withDeadlineAfter(10, TimeUnit.SECONDS); // Set a deadline, crucial for production
    }

    public void ensureSchemaExists() {
        // A common mistake is not defining the schema idempotently.
        // This check prevents errors on subsequent runs.
        var result = weaviateClient.schema().classGetter().withClassName("DocumentChunk").run();
        if (result.hasErrors() || result.getResult() == null) {
            logger.info("Schema 'DocumentChunk' not found. Creating...");
            WeaviateClass documentChunkClass = WeaviateClass.builder()
                .className("DocumentChunk")
                .description("A chunk of text from a larger document")
                .vectorizer("none") // We provide our own vectors
                .properties(List.of(
                    Property.builder().name("documentId").dataType(List.of("text")).build(),
                    Property.builder().name("chunkId").dataType(List.of("text")).build(),
                    Property.builder().name("content").dataType(List.of("text")).build(),
                    Property.builder().name("source").dataType(List.of("text")).build()
                ))
                .build();
            weaviateClient.schema().classCreator().withClass(documentChunkClass).run();
        }
    }

    public void ingestDocument(String documentId, String rawText, String source) {
        logger.info("Starting ingestion for document ID: {}", documentId);
        ensureSchemaExists();

        DocumentRequest request = DocumentRequest.newBuilder()
                .setDocumentId(documentId)
                .setRawText(rawText)
                .putMetadata("source", source)
                .build();

        DocumentResponse response;
        try {
            response = nlpClient.processDocument(request);
        } catch (StatusRuntimeException e) {
            logger.error("gRPC call failed: {}", e.getStatus());
            // Implement retry logic or dead-letter queue here.
            return;
        }

        List<String> chunkTexts = response.getChunksList().stream()
                .map(Chunk::getText)
                .collect(Collectors.toList());

        // Batch embedding generation is far more efficient.
        float[][] vectors = embeddingService.generateEmbeddings(chunkTexts);

        List<WeaviateObject> objectsToCreate = new java.util.ArrayList<>();
        for (int i = 0; i < response.getChunksList().size(); i++) {
            Chunk chunk = response.getChunksList().get(i);
            Map<String, Object> properties = new HashMap<>();
            properties.put("documentId", documentId);
            properties.put("chunkId", chunk.getChunkId());
            properties.put("content", chunk.getText());
            properties.put("source", source);

            objectsToCreate.add(WeaviateObject.builder()
                    .className("DocumentChunk")
                    .id(UUID.randomUUID().toString()) // Weaviate manages IDs, but providing one is good practice.
                    .properties(properties)
                    .vector(vectors[i])
                    .build());
        }

        // Use batching for ingestion into Weaviate for performance.
        var batchResult = weaviateClient.batch().objectsBatcher()
                .withObjects(objectsToCreate.toArray(new WeaviateObject[0]))
                .run();
        
        if (batchResult.hasErrors()) {
            logger.error("Weaviate batch import failed: {}", batchResult.getError().getMessages());
        } else {
            logger.info("Successfully ingested {} chunks for document ID: {}", objectsToCreate.size(), documentId);
        }
    }
}

查询路径:融合语义与结构化过滤

查询是整个系统的价值体现。我们使用GraphQL作为API层,因为它能让前端根据需求精确地请求数据,避免了RESTful API常见的过度或不足的数据获取问题。

1. GraphQL Schema 定义

Schema是API的契约。我们定义一个hybridSearch查询,它接收一个自然语言问题,并允许附加过滤器。

schema.graphqls

type Query {
    """
    Performs a hybrid search combining semantic vector search with keyword-based filtering.
    """
    hybridSearch(question: String!, sourceFilter: [String]): [SearchResult!]
}

type SearchResult {
    documentId: String!
    chunkId: String!
    content: String!
    source: String
    score: Float! # The relevance score from the vector search
}

2. GraphQL Resolver 实现

这是Java服务中最复杂的逻辑所在。它需要:

  1. 对用户问题进行向量化。
  2. (可选) 调用NLP服务提取问题中的实体,用于结构化过滤。
  3. 构建一个同时包含向量搜索和where过滤器的Weaviate查询。
  4. 执行查询并返回格式化的结果。

SearchResolver.java

package com.yourcompany.rag.graphql;

import com.yourcompany.rag.services.EmbeddingService;
import graphql.kickstart.tools.GraphQLQueryResolver;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.v1.graphql.model.GraphQLResponse;
import io.weaviate.client.v1.graphql.query.argument.NearVectorArgument;
import io.weaviate.client.v1.graphql.query.argument.WhereArgument;
import io.weaviate.client.v1.graphql.query.argument.WhereOperator;
import io.weaviate.client.v1.graphql.query.builder.GetBuilder;
import io.weaviate.client.v1.graphql.query.fields.Field;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Component
public class SearchResolver implements GraphQLQueryResolver {

    private final WeaviateClient weaviateClient;
    private final EmbeddingService embeddingService;

    public SearchResolver(WeaviateClient weaviateClient, EmbeddingService embeddingService) {
        this.weaviateClient = weaviateClient;
        this.embeddingService = embeddingService;
    }

    public List<SearchResult> hybridSearch(String question, List<String> sourceFilter) {
        float[] questionVector = embeddingService.generateEmbeddings(List.of(question))[0];

        NearVectorArgument nearVector = NearVectorArgument.builder()
                .vector(questionVector)
                .build();

        // Dynamically build the 'where' filter if provided.
        // This is the key to hybrid search.
        WhereArgument whereFilter = null;
        if (sourceFilter != null && !sourceFilter.isEmpty()) {
            whereFilter = WhereArgument.builder()
                    .path(new String[]{"source"})
                    .operator(WhereOperator.ContainsAny)
                    .valueText(sourceFilter.toArray(new String[0]))
                    .build();
        }

        GetBuilder queryBuilder = weaviateClient.graphQL().get()
                .withClassName("DocumentChunk")
                .withFields(
                        Field.builder().name("content").build(),
                        Field.builder().name("documentId").build(),
                        Field.builder().name("chunkId").build(),
                        Field.builder().name("source").build(),
                        Field.builder().name("_additional").fields(
                                Field.builder().name("distance").build() // or certainty/score
                        ).build()
                )
                .withNearVector(nearVector)
                .withLimit(10);
        
        if (whereFilter != null) {
            queryBuilder.withWhere(whereFilter);
        }

        GraphQLResponse response = queryBuilder.run();

        if (response.hasErrors()) {
            // Log the errors properly.
            throw new RuntimeException("GraphQL query to Weaviate failed: " + response.getErrors()[0].getMessage());
        }

        Map<String, List<Map<String, Object>>> data = (Map<String, List<Map<String, Object>>>) response.getDataAsMap().get("Get");
        List<Map<String, Object>> chunks = data.get("DocumentChunk");

        return chunks.stream()
                .map(this::mapToSearchResult)
                .collect(Collectors.toList());
    }

    private SearchResult mapToSearchResult(Map<String, Object> chunkData) {
        SearchResult result = new SearchResult();
        result.setContent((String) chunkData.get("content"));
        result.setDocumentId((String) chunkData.get("documentId"));
        result.setChunkId((String) chunkData.get("chunkId"));
        result.setSource((String) chunkData.get("source"));
        
        Map<String, Object> additional = (Map<String, Object>) chunkData.get("_additional");
        result.setScore(((Number) additional.get("distance")).floatValue());

        return result;
    }
}

4. 前端集成:Solid.js

前端的角色是提供一个流畅的用户界面来消费GraphQL API。Solid.js因其基于Signal的细粒度响应式系统而非常适合这种数据驱动的应用,它能以极高的性能更新UI。

SearchComponent.jsx

import { createSignal, createResource } from "solid-js";

// A simplified GraphQL client function
async function gqlQuery(query, variables) {
  const response = await fetch('/graphql', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ query, variables }),
  });
  if (!response.ok) {
    throw new Error("Network response was not ok");
  }
  const json = await response.json();
  if (json.errors) {
    // A production app should handle this more gracefully
    throw new Error(json.errors.map(e => e.message).join('\n'));
  }
  return json.data.hybridSearch;
}

const HYBRID_SEARCH_QUERY = `
  query HybridSearch($question: String!) {
    hybridSearch(question: $question) {
      content
      documentId
      source
      score
    }
  }
`;

function SearchComponent() {
  const [question, setQuestion] = createSignal("");

  const [searchResults] = createResource(
    // The resource will refetch whenever this source signal changes
    () => question(),
    async (currentQuestion) => {
      // Don't fetch for empty or very short queries
      if (currentQuestion.length < 3) {
        return [];
      }
      return gqlQuery(HYBRID_SEARCH_QUERY, { question: currentQuestion });
    }
  );

  return (
    <div>
      <input
        type="text"
        placeholder="Ask a question about your documents..."
        onInput={(e) => setQuestion(e.currentTarget.value)}
        style={{ width: "100%", padding: "10px", "font-size": "16px" }}
      />
      
      <div>
        {searchResults.loading && <p>Searching...</p>}
        {searchResults.error && <p style={{color: "red"}}>{searchResults.error.message}</p>}
        <ul>
          <For each={searchResults()}>
            {(item) => (
              <li style={{ "margin-bottom": "15px", border: "1px solid #ccc", padding: "10px" }}>
                <p>{item.content}</p>
                <small>Source: {item.source} | Score: {item.score.toFixed(4)}</small>
              </li>
            )}
          </For>
        </ul>
      </div>
    </div>
  );
}

export default SearchComponent;

架构的局限性与未来迭代

当前这套架构并非没有缺点。最明显的是,Java到Python的同步gRPC调用在查询路径上引入了额外的网络延迟,尽管在数据摄入阶段这通常是可以接受的。对于对延迟极其敏感的在线查询场景,一个优化方向是预先处理查询,或者将部分轻量级的NLP逻辑(如关键字提取)直接在Java中实现。

另一个复杂点是运维。维护一个异构系统需要更成熟的DevOps实践,包括统一的日志聚合、跨服务的分布式追踪(例如使用OpenTelemetry)以及更复杂的部署策略。这些都不是免费的午餐,是选择这套架构时必须付出的代价。

未来的迭代可以探索几个方向:

  1. 异步化数据管道: 将数据摄入从同步的RPC调用改为基于消息队列(如Kafka)的异步事件驱动模式,可以极大地提高系统的吞吐量和韧性。
  2. 引入重排(Re-ranking)阶段: 在从Weaviate召回初步候选集后,可以引入一个更复杂的交叉编码器模型(Cross-encoder)对前K个结果进行重排序,以进一步提升精度。这可以作为另一个独立的微服务实现。
  3. 查询意图识别: 在执行搜索前,增加一个查询分类模块,用于判断用户查询是事实性的、导航性的还是探索性的,并据此动态调整检索策略,例如是更多地依赖向量搜索还是结构化过滤。

  目录