Use Case: Production RAG Chatbot
TL;DR: Build a production-ready RAG chatbot that combines document retrieval with user personalization—all in ~100 lines of Python.
The Goal
Build a customer support chatbot that:
- Searches knowledge base for relevant docs
- Personalizes responses based on user tier and history
- Updates in real-time when docs change
- Stays within token limits for cost control
Architecture Overview
graph TD
User[User Query] --> API[FastAPI]
API --> Context[Context Assembly]
Context --> Docs[Doc Retriever]
Context --> Features[User Features]
Docs --> Vector[(pgvector)]
Features --> Redis[(Redis)]
Context --> LLM[OpenAI/Anthropic]
LLM --> Response[Response]Step 1: Define Entities and Features
# chatbot.py
from fabra.core import FeatureStore, entity, feature
from datetime import timedelta
store = FeatureStore()
@entity(store)
class User:
user_id: str
@entity(store)
class Document:
doc_id: str
# User features for personalization
@feature(entity=User, materialize=True, refresh=timedelta(hours=1))
def user_tier(user_id: str) -> str:
# From your database
# return db.get_user_tier(user_id)
return "premium"
@feature(entity=User, materialize=True)
def support_history(user_id: str) -> list[str]:
# Last 5 support tickets
return ["Ticket #101", "Ticket #102"]
@feature(entity=User, trigger="message_sent")
def message_count(user_id: str, event) -> int:
return 1 # Simplified for exampleStep 2: Index Your Knowledge Base
# index_docs.py
import asyncio
from chatbot import store
async def index_knowledge_base():
docs = [
{"id": "doc_1", "text": "To reset your password...", "category": "account"},
]
for doc in docs:
await store.index(
index_name="knowledge_base",
entity_id=doc["id"],
text=doc["text"],
metadata={"category": doc["category"]}
)
print(f"Indexed {len(docs)} documents")
if __name__ == "__main__":
asyncio.run(index_knowledge_base())Step 3: Define Retrievers
# chatbot.py (continued)
from fabra.retrieval import retriever
@retriever(name="knowledge_base", cache_ttl=300)
async def search_docs(query: str) -> list[str]:
# Logic to search pgvector
# return await store.vector_search("knowledge_base", query)
return ["Fabra.simplifies RAG.", "Context Store manages tokens."]Step 4: Assemble Context
# chatbot.py (continued)
from fabra.context import context, Context, ContextItem
@context(max_tokens=4000)
async def chat_context(user_id: str, query: str) -> Context:
# Fetch all components
docs = await search_docs(query)
# Get features (returns dict)
user_features = await store.get_online_features("User", user_id, ["user_tier", "support_history"])
tier = user_features.get("user_tier")
history = user_features.get("support_history", [])
items = [
ContextItem(content="You are a helpful assistant.", priority=0, required=True),
ContextItem(content=f"User tier: {tier}", priority=1, required=True),
ContextItem(content="Docs:\n" + "\n".join(docs), priority=2, required=True),
]
# Add history for premium users
if tier == "premium" and history:
items.append(ContextItem(
content=f"History:\n" + "\n".join(history),
priority=3,
required=False # Drop if budget exceeded
))
return items # Returned list is automatically wrapped in ContextStep 5: Create the API
# chatbot.py (continued)
from fastapi import FastAPI
from pydantic import BaseModel
import openai
app = FastAPI()
class ChatRequest(BaseModel):
user_id: str
message: str
class ChatResponse(BaseModel):
response: str
context_used: str
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
# Get assembled context
ctx = await chat_context(request.user_id, request.message)
# Call LLM
from openai import AsyncOpenAI
client = AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": ctx.content}, # Access final string content
{"role": "user", "content": request.message}
]
)
return ChatResponse(
response=response.choices[0].message.content,
context_used=ctx.content
)Step 6: Real-Time Doc Updates
Keep context fresh when documents change:
# chatbot.py (continued)
from fabra.events import AxiomEvent
@feature(entity=Document, trigger="doc_updated")
async def doc_content(doc_id: str, event: AxiomEvent) -> str:
content = event.payload["content"]
# Re-index in vector store
await store.index(
index_name="knowledge_base",
entity_id=doc_id,
text=content,
metadata=event.payload.get("metadata", {})
)
return contentPublish updates from your CMS:
# cms_webhook.py
from fabra.bus import RedisEventBus
from fabra.events import AxiomEvent
bus = RedisEventBus()
async def on_doc_save(doc_id: str, content: str, metadata: dict):
await bus.publish(AxiomEvent(
event_type="doc_updated",
entity_id=doc_id,
payload={"content": content, "metadata": metadata}
))Step 7: Run It
# Terminal 1: Start server
fabra serve chatbot.py
# Terminal 2: Start worker (for event processing)
fabra worker chatbot.py
# Terminal 3: Test it
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"user_id": "user_123", "message": "How do I reset my password?"}'Production Considerations
Token Budget by Tier
@context(store, max_tokens=4000)
async def chat_context(user_id: str, query: str) -> Context:
tier = await store.get_feature("user_tier", user_id)
# Premium gets more context
budget = 8000 if tier == "premium" else 4000
top_k = 5 if tier == "premium" else 3
docs = await search_docs(query) # Uses top_k from retriever
return Context(
items=[...],
max_tokens=budget
)Rate Limiting
@app.post("/chat")
async def chat(request: ChatRequest):
count = await store.get_feature("message_count", request.user_id) or 0
tier = await store.get_feature("user_tier", request.user_id)
limit = 100 if tier == "premium" else 20
if count >= limit:
raise HTTPException(429, "Rate limit exceeded")
# Continue with chat...Observability
# Debug context assembly
@app.get("/debug/context/{user_id}")
async def debug_context(user_id: str, query: str):
trace = await store.explain_context("chat_context", user_id=user_id, query=query)
return traceFull Code
See the complete example at: examples/rag_chatbot.py
Metrics
With this setup, you get:
| Metric | Value |
|---|---|
| Context assembly latency | ~50ms |
| Vector search latency | ~20ms (cached: ~2ms) |
| Feature retrieval | ~5ms (Redis) |
| Total overhead | ~75ms |
Next Steps
- Context Store Overview: Deep dive into RAG infrastructure
- Retrievers: Advanced search patterns
- Event-Driven Features: Real-time updates
- Architecture: System design