RAG Search Agent
This agent implements a Retrieval Augmented Generation (RAG) pipeline. It demonstrates how to manage persistent vector indexes and cache search results using the Workspace.How it Works
- Ingestion: You can send documents to be “indexed”.
- Persistence: The agent saves the document embeddings (simulated) into a vector store located in
/workspace/index/. - Search: When you ask a query, it:
- Checks the result cache in
/workspace/cache/. - If miss, scans the persistent index.
- Returns the most relevant document.
- Checks the result cache in
Key Features
- Persistent Index: The vector database lives in
/workspace, surviving redeploys. - Result Caching: Expensive search operations are cached to disk to save compute.
- Stateful updates: You can incrementally add documents to the index over time.
Usage
Copy
# Add a document
orpheus run rag-search '{"action": "index", "doc": "Orpheus uses warm workers."}'
# Search
orpheus run rag-search '{"action": "search", "query": "warm workers"}'
Source Code
- agent.yaml
- rag_agent.py
- indexer.py
- requirements.txt
Copy
name: rag-search
runtime: python3
module: rag_agent
entrypoint: handler
memory: 1024
timeout: 600 # 10 min - LLM inference can be slow
# Model specification - ServiceManager handles model server lifecycle
model: mistral
engine: ollama # Using Ollama for local inference (macOS-friendly)
# Environment variables
env:
- INDEX_DIR=/agent/data/faiss_index
- EMBEDDING_MODEL=nomic-embed-text
- OLLAMA_MODEL=mistral
# Telemetry configuration - custom labels for Prometheus filtering
telemetry:
enabled: true
labels:
team: ml_platform
tier: standard
use_case: rag
# Scaling configuration
scaling:
min_workers: 1
max_workers: 20
target_utilization: 1.5
scale_up_threshold: 2.0
scale_down_threshold: 0.3
scale_up_delay: "15s"
scale_down_delay: "60s"
queue_size: 50
Copy
#!/usr/bin/env python3
"""
RAG Agent for Orpheus: Retrieval-Augmented Generation using FAISS + LLM.
This agent demonstrates Orpheus's core value proposition:
1. Queue-Depth Autoscaling (not CPU):
- LLM inference is I/O-bound, CPU stays at ~3 millicores
- Orpheus scales on queue depth, not CPU metrics
- Result: 38x faster queue drain vs CPU-based HPA
2. Native Model Server Management (ServiceManager):
- Agent declares `model: mistral` in agent.yaml
- Orpheus auto-detects platform: macOS → Ollama, Linux+GPU → vLLM
- No Docker/k8s sidecar config, just works
The agent:
1. Retrieves relevant documents from a FAISS index
2. Generates answers using LLM (model server managed by Orpheus)
3. Returns structured results with sources and timing
"""
import os
import time
from pathlib import Path
# RAG dependencies
from langchain_community.vectorstores import FAISS
from langchain_ollama import OllamaLLM, OllamaEmbeddings
from langchain_core.prompts import PromptTemplate
# Configuration
INDEX_DIR = os.getenv("INDEX_DIR", "./data/faiss_index")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "nomic-embed-text")
TOP_K = int(os.getenv("TOP_K", "4"))
# Model endpoint - ServiceManager injects MODEL_URL automatically
# Fallback to OLLAMA_BASE_URL for manual override or local testing
MODEL_URL = os.getenv("MODEL_URL") or os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
MODEL_NAME = os.getenv("OLLAMA_MODEL", "mistral")
# RAG prompt template
RAG_PROMPT = PromptTemplate(
input_variables=["context", "question"],
template="""You are a helpful assistant that answers questions based on the provided context.
Use only the information from the context to answer. If the context doesn't contain
enough information to answer, say "I don't have enough information to answer this question."
Context:
{context}
Question: {question}
Answer:""",
)
class RAGEngine:
"""RAG engine that retrieves context and generates answers."""
def __init__(self):
self.vectorstore = None
self.llm = None
self._initialized = False
self._timings = {}
def initialize(self):
"""Load the FAISS index and initialize the LLM."""
if self._initialized:
return
# Load embeddings model (Ollama-based, HTTP request to model server)
print(f"[rag] Using Ollama embeddings: {EMBEDDING_MODEL} at {MODEL_URL}")
embeddings = OllamaEmbeddings(
base_url=MODEL_URL,
model=EMBEDDING_MODEL,
)
# Load FAISS index
index_path = Path(INDEX_DIR)
if not index_path.exists():
raise FileNotFoundError(
f"FAISS index not found at {INDEX_DIR}. "
"Copy the index to /workspace/faiss_index or set INDEX_DIR."
)
print(f"[rag] Loading FAISS index from {INDEX_DIR}")
start = time.time()
self.vectorstore = FAISS.load_local(
str(index_path),
embeddings,
allow_dangerous_deserialization=True,
)
self._timings["index_load"] = time.time() - start
# Initialize LLM - endpoint provided by Orpheus ServiceManager
print(f"[rag] Connecting to model server at {MODEL_URL} (model: {MODEL_NAME})")
self.llm = OllamaLLM(
base_url=MODEL_URL,
model=MODEL_NAME,
temperature=0.1,
)
self._initialized = True
print(f"[rag] RAG engine initialized (index load: {self._timings['index_load']:.2f}s)")
def retrieve(self, question: str, top_k: int = None) -> tuple:
"""Retrieve relevant documents for a question."""
if not self._initialized:
self.initialize()
top_k = top_k or TOP_K
start_time = time.time()
docs = self.vectorstore.similarity_search(question, k=top_k)
elapsed = time.time() - start_time
print(f"[rag] Retrieved {len(docs)} docs in {elapsed:.3f}s")
return docs, elapsed
def generate(self, question: str, context_docs: list) -> tuple:
"""Generate an answer using the LLM."""
if not self._initialized:
self.initialize()
# Format context from retrieved documents
context = "\n\n---\n\n".join(
[f"[{i+1}] {doc.page_content}" for i, doc in enumerate(context_docs)]
)
# Create prompt
prompt = RAG_PROMPT.format(context=context, question=question)
# Generate response
start_time = time.time()
response = self.llm.invoke(prompt)
elapsed = time.time() - start_time
print(f"[rag] LLM generated response in {elapsed:.3f}s")
return response, elapsed
def answer(self, question: str) -> dict:
"""Full RAG pipeline: retrieve + generate."""
if not self._initialized:
self.initialize()
total_start = time.time()
# Retrieve relevant context
docs, retrieval_time = self.retrieve(question)
# Generate answer
answer, llm_time = self.generate(question, docs)
total_time = time.time() - total_start
return {
"question": question,
"answer": answer,
"sources": [
{
"content": doc.page_content[:200] + "...",
"metadata": doc.metadata,
}
for doc in docs
],
"timing": {
"retrieval_ms": int(retrieval_time * 1000),
"llm_ms": int(llm_time * 1000),
"total_ms": int(total_time * 1000),
},
}
# Global RAG engine instance (persists across invocations via workspace)
_engine = None
def get_engine() -> RAGEngine:
"""Get or create the global RAG engine instance."""
global _engine
if _engine is None:
_engine = RAGEngine()
return _engine
def handler(input_data: dict) -> dict:
"""
Orpheus agent handler for RAG queries.
Input:
{"question": "What is X?"}
or
{"query": "What is X?"} # alias
Output:
{
"status": "success",
"agent": "rag-agent",
"answer": "...",
"sources": [...],
"timing": {...}
}
"""
try:
# Get question from input (support both 'question' and 'query')
question = input_data.get("question") or input_data.get("query")
if not question:
return {
"status": "error",
"agent": "rag-agent",
"error": "Missing 'question' or 'query' field in input",
}
# Get RAG engine and process question
engine = get_engine()
result = engine.answer(question)
return {
"status": "success",
"agent": "rag-agent",
"answer": result["answer"],
"sources": result["sources"],
"timing": result["timing"],
}
except FileNotFoundError as e:
return {
"status": "error",
"agent": "rag-agent",
"error": str(e),
"hint": "Ensure FAISS index is available at INDEX_DIR",
}
except Exception as e:
return {
"status": "error",
"agent": "rag-agent",
"error": str(e),
}
Copy
#!/usr/bin/env python3
"""
Indexer: Builds FAISS vector index from documents in ./data/docs.
Persists index to disk so workers don't need to re-embed every start.
"""
import os
import sys
from pathlib import Path
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
# Configuration
DOCS_DIR = os.getenv("DOCS_DIR", "./data/docs")
INDEX_DIR = os.getenv("INDEX_DIR", "./data/faiss_index")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "500"))
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "50"))
def load_documents(docs_dir: str) -> list:
"""Load all markdown and text files from the docs directory."""
docs_path = Path(docs_dir)
if not docs_path.exists():
raise FileNotFoundError(f"Documents directory not found: {docs_dir}")
# Load markdown files
md_loader = DirectoryLoader(
str(docs_path),
glob="**/*.md",
loader_cls=TextLoader,
loader_kwargs={"encoding": "utf-8"},
)
# Load text files
txt_loader = DirectoryLoader(
str(docs_path),
glob="**/*.txt",
loader_cls=TextLoader,
loader_kwargs={"encoding": "utf-8"},
)
docs = md_loader.load() + txt_loader.load()
print(f"[indexer] Loaded {len(docs)} documents from {docs_dir}")
return docs
def split_documents(docs: list) -> list:
"""Split documents into chunks for embedding."""
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separators=["\n\n", "\n", ". ", " ", ""],
)
chunks = splitter.split_documents(docs)
print(
f"[indexer] Split into {len(chunks)} chunks (size={CHUNK_SIZE}, overlap={CHUNK_OVERLAP})"
)
return chunks
def create_embeddings():
"""Initialize the embedding model."""
print(f"[indexer] Loading embedding model: {EMBEDDING_MODEL}")
embeddings = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL,
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True},
)
return embeddings
def build_index(chunks: list, embeddings) -> FAISS:
"""Build FAISS index from document chunks."""
print(f"[indexer] Building FAISS index from {len(chunks)} chunks...")
vectorstore = FAISS.from_documents(
documents=chunks,
embedding=embeddings,
)
print("[indexer] FAISS index built successfully")
return vectorstore
def save_index(vectorstore: FAISS, index_dir: str):
"""Save FAISS index to disk."""
index_path = Path(index_dir)
index_path.mkdir(parents=True, exist_ok=True)
vectorstore.save_local(str(index_path))
print(f"[indexer] Index saved to {index_dir}")
def main():
"""Main indexer entry point."""
print("=" * 60)
print("RAG Indexer - Building FAISS Vector Index")
print("=" * 60)
# Load documents
docs = load_documents(DOCS_DIR)
if not docs:
print("[indexer] ERROR: No documents found!")
sys.exit(1)
# Split into chunks
chunks = split_documents(docs)
# Create embeddings model
embeddings = create_embeddings()
# Build index
vectorstore = build_index(chunks, embeddings)
# Save to disk
save_index(vectorstore, INDEX_DIR)
print("=" * 60)
print("Indexing complete!")
print(f" Documents: {len(docs)}")
print(f" Chunks: {len(chunks)}")
print(f" Index location: {INDEX_DIR}")
print("=" * 60)
if __name__ == "__main__":
main()
Copy
langchain>=0.1.0
langchain-community>=0.0.10
langchain-ollama>=0.0.1
faiss-cpu>=1.7.0

