はじめに

Open WebUIとVertex AI Searchの連携は、生成AIの活用を大きく広げる可能性を秘めています。しかし、現時点では公式なパイプラインが公開されていないため、自力でパイプラインを作成する必要があります。本記事では、Open WebUIとVertex AI Searchを連携させるためのカスタムパイプラインの作成方法を解説します。

Open WebUIとは?
Open WebUI は、大規模言語モデル(LLM) をローカル環境で手軽に利用するための、Web ベースのインターフェース です。

※詳しくはこちらを参照下さい。
Open WebUI
Gitソース

導入手順

※Open WebUIのソースをGitからclone済み

① Open WebUIのコンテナに必要なモジュールと環境変数をセットする

docker-compose.yaml

services:
  ollama:
    volumes:
      - ollama:/root/.ollama
    container_name: ollama
    pull_policy: always
    tty: true
    restart: unless-stopped
    image: ollama/ollama:${OLLAMA_DOCKER_TAG-latest}

open-webui:
  build:
    context: .
    args:
      OLLAMA_BASE_URL: '/ollama'
    dockerfile: Dockerfile
  image: ghcr.io/open-webui/open-webui:${WEBUI_DOCKER_TAG-main}
  container_name: open-webui-test
  volumes:
    - open-webui:/app/backend/data
    - ${GOOGLE_APPLICATION_CREDENTIALS}:/tmp/keys/key.json:ro
  depends_on:
    - ollama
  ports:
    - ${OPEN_WEBUI_PORT-3000}:8080
  environment:
    - 'OLLAMA_BASE_URL=http://ollama:11434'
    - 'WEBUI_SECRET_KEY='
    - 'GCLOUD_PROJECT=${GCLOUD_PROJECT}'
    - 'GOOGLE_APPLICATION_CREDENTIALS=/tmp/keys/key.json'
  extra_hosts:
    - host.docker.internal:host-gateway
  restart: unless-stopped

volumes:
  ollama: {}
  open-webui: {}

.env

GCLOUD_PROJECT={使用しているプロジェクト}
GOOGLE_API_KEY={APIキー}
GOOGLE_APPLICATION_CREDENTIALS=/{認証情報が置いてあるpath}/application_default_credentials.json

GOOGLE_API_KEY取得手順
Gemini Developer API

cretcredentials取得手順
※参照 アプリケーションのデフォルト認証情報を設定する

backend/requirements.txt

langchain-google-community==1.0.7
langchain-google-vertexai==1.0.10
langchain-google-community[vertexaisearch]
※追加

② Pipeをセットする

※今回はVertex AI SearchにLang Chainを使って接続

Vertex AI Search接続用のPipe

conectVertexAISearch.py

import os
from pydantic import BaseModel, Field
from typing import List, Union, Iterator
from langchain_google_community import VertexAISearchRetriever
from langchain_core.prompts import ChatPromptTemplate
from textwrap import dedent
from langchain_google_vertexai import ChatVertexAI
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
import google.generativeai as genai

# Set DEBUG to True to enable detailed logging
DEBUG = True
PROJECT_ID = os.getenv("GCLOUD_PROJECT")  # Set to your Project ID

class Pipe:
    class Valves(BaseModel):
        GOOGLE_API_KEY: str = Field(default="")
        STORE_ID: str = Field(default="")


    def __init__(self):
        self.id = "self-mode"
        self.type = "api connect"
        self.name = "Vertex AI Search connect:"
        self.valves = self.Valves(
            **{
                "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""),
            }
        )

    def get_vertex_aisearch_google_models(self):
        if not self.valves.GOOGLE_API_KEY:
            return [
                {
                    "id": "error",
                    "name": "GOOGLE_API_KEY is not set. Please update the API Key in the valves.",
                }
            ]
        try:
            genai.configure(api_key=self.valves.GOOGLE_API_KEY)
            models = genai.list_models()
            valid_model_names = ["gemini-1.5-pro", "gemini-1.0-pro-001", "gemini-1.0-pro", "gemini-1.0-pro-002",
                                 "gemini-1.5-flash-001"]
            return [
                {
                    "id": model.name[7:],  # remove the "models/" part
                    "name": model.display_name,
                }
                for model in models
                if "generateContent" in model.supported_generation_methods
                if model.name[7:] in valid_model_names
            ]
        except Exception as e:
            if DEBUG:
                print(f"Error fetching Google models: {e}")
            return [
                {"id": "error", "name": f"Could not fetch models from Google: {str(e)}"}
            ]

    def pipes(self) -> List[dict]:
        return self.get_vertex_aisearch_google_models()

    def pipe(self, body: dict) -> Union[str, Iterator[str]]:
        if not self.valves.STORE_ID:
            return f"Error: STORE_ID is not set. Please update the STORE_ID in the valves."

        try:
            retriever = self.get_retriever(self.valves.STORE_ID)

            system_prompt: str = """
                                 {任意のシステムプロンプト}
                                 """

            premise = dedent(system_prompt).strip()

            prompt = ChatPromptTemplate.from_template(
                dedent(
                    """
                    Premise: {premise}

                    Context: {context}

                    Question: {question}
                    """
                ).strip(),
                partial_variables={"premise": premise},
            )

            model_id = self.setModel(body)

            if not model_id.startswith("gemini-"):
                return f"Error: Invalid model name format: {model_id}"

            llm = ChatVertexAI(model_name=model_id, temperature=0)

            def format_docs(docs):
                return "\n\n".join(doc.page_content for doc in docs)

            chain = (
                    {"context": retriever | format_docs, "question": RunnablePassthrough()}
                    | prompt
                    | llm
                    | StrOutputParser()
            )

            return chain.stream(self.setQuery(body))

        except Exception as e:
            if DEBUG:
                print(f"Error in pipe method: {e}")
            return f"Error: {e}"

    @staticmethod
    def setQuery(body):
        messages = body["messages"]
        history = "\n".join([msg["content"] for msg in messages[:-1]])
        current_query = messages[-1]["content"]
        return f"History:\n{history}\n\nCurrent Query:\n{current_query}"

    @staticmethod
    def setModel(body):
        model_id = body.get("model", "gemini-1.5-pro")

        if "." in model_id:
            model_id = model_id.split(".", 1)[-1]

        return model_id

    @staticmethod
    def get_retriever(store_id: str) -> VertexAISearchRetriever:
        return VertexAISearchRetriever(
            project_id=PROJECT_ID,
            data_store_id=store_id,
            location_id='global',
            max_documents=5,
        )

※ValvesのGOOGLE_API_KEY/STORE_ID入力必須

おまけ

Open WebUIからVertex AI Searchを搭載したバックエンドにAPIで接続するPipe

connect_api.py

import requests
import os
from pydantic import BaseModel, Field
from typing import List, Union, Iterator
import google.generativeai as genai

# Set DEBUG to True to enable detailed logging
DEBUG = True
class Pipe:
    class Valves(BaseModel):
        CONNECT_API_URL: str = Field(default="")
        GOOGLE_API_KEY: str = Field(default="")
        STORE_ID: str = Field(default="")
        PROJECT_ID: str = Field(default="")

    def __init__(self):
        self.id = "self-mode"
        self.type = "api connect"
        self.name = "API connect: "
        self.valves = self.Valves(
            **{
                "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""),
            }
        )

    def get_vertex_aisearch_google_models(self):
        if not self.valves.GOOGLE_API_KEY:
            return [
                {
                    "id": "error",
                    "name": "GOOGLE_API_KEY is not set. Please update the API Key in the valves.",
                }
            ]
        try:
            genai.configure(api_key=self.valves.GOOGLE_API_KEY)
            models = genai.list_models()
            valid_model_names = ["gemini-1.5-pro", "gemini-1.0-pro-001", "gemini-1.0-pro", "gemini-1.0-pro-002",
                                 "gemini-1.5-flash-001"]
            return [
                {
                    "id": model.name[7:],  # remove the "models/" part
                    "name": model.display_name,
                }
                for model in models
                if "generateContent" in model.supported_generation_methods
                if model.name[7:] in valid_model_names
            ]
        except Exception as e:
            if DEBUG:
                print(f"Error fetching Google models: {e}")
            return [
                {"id": "error", "name": f"Could not fetch models from Google: {str(e)}"}
            ]

    def pipes(self) -> List[dict]:
        return self.get_vertex_aisearch_google_models()

    def pipe(self, body: dict) -> Union[str, Iterator[str]]:

        for key in ['CONNECT_API_URL', 'STORE_ID', 'PROJECT_ID', 'GOOGLE_API_KEY']:
            if not getattr(self.valves, key):
                return f"Error: {key} is not set. Please update the {key} in the valves."

        try:
            r = requests.post(
                self.valves.CONNECT_API_URL,
                json={"body": body, "valves": {
                    'PROJECT_ID':  self.valves.PROJECT_ID,
                    'STORE_ID':  self.valves.STORE_ID,
                }},
                stream=True
            )
            if r.status_code == 200:
                return (line.decode('utf-8') + '\n' for line in r.iter_lines(chunk_size=10))

            else:
                return "Failed. status code: " + str(r.status_code)

        except Exception as e:
            if DEBUG:
                print(f"Error in pipe method: {e}")
            return f"Error: {e}"</pre>

※バックエンドの仕様によって、APIへ渡すパラメーターは適宜変更して下さい。
※バックエンドで処理する形にするとOpen WebUI側にモジュールのインストールが不要となるので、
Open WebUI側に手を入れたくない方はこちらがおすすめです。

まとめ

Open WebUIからVertex AI Searchを組み合わせることで、よりマネージドで高度な機能を備えたRAGシステムの構築が可能かと思います。今回ご紹介したPipeを軸として、要件に合わせて処理を追加・変更していくことが可能なのでより柔軟で幅の広いシステム構築が実現していけると思います。