Conversational Memory Agent
This agent demonstrates stateful interactions using both Session Affinity and Workspace Persistence. It remembers the chat history for a specific user.How it Works
- Session Routing: Orpheus routes requests with the same
--sessionID to the same worker if possible. - In-Memory Cache: The worker keeps recent history in a Python dictionary for instant access.
- Workspace Backup: Every message is also backed up to a JSON file in
/workspace/sessions/.- If the worker restarts, it reloads history from the workspace.
- This guarantees durability even if the node crashes.
Key Features
- Session ID: You pass a
session_id(or use the CLI--sessionflag). - Dual-Layer State: Fast in-memory access + durable disk storage.
- Context Window: The agent “remembers” what you said in previous turns.
Usage
Copy
# Start a conversation
orpheus run conversational-memory '{"message": "My name is Alice"}' --session user-123
# Follow up (it remembers!)
orpheus run conversational-memory '{"message": "What is my name?"}' --session user-123
Source Code
- agent.yaml
- agent.py
Copy
name: conversational-memory
runtime: python3
module: agent
entrypoint: handler
memory: 512
timeout: 120
# Model server via ServiceManager
model: TinyLlama/TinyLlama-1.1B-Chat-v1.0
engine: vllm # vLLM for GPU inference
env:
- WORKSPACE_DIR=/workspace
- MODEL_URL=${MODEL_URL:-http://localhost:8000} # vLLM server endpoint
# Telemetry configuration - custom labels for Prometheus filtering
telemetry:
enabled: true
labels:
team: ml_platform
tier: standard
use_case: conversational
# Session affinity CRITICAL for this agent
session:
enabled: true
key: "X-Session-ID"
ttl: "2h"
wait_timeout: "200ms"
scaling:
min_workers: 1
max_workers: 5
Copy
#!/usr/bin/env python3
"""
Conversational Therapist Agent for Orpheus
Demonstrates:
- Session affinity (same patient → same worker)
- Workspace persistence (SQLite survives container death)
- Long-term memory (emotional patterns over time)
- Self-contained (Ollama via ServiceManager, no API keys)
Demonstrates workspace persistence:
1. Send message → get empathetic response
2. Kill container mid-conversation
3. Resume → therapist remembers everything
Workspace state persists across container restarts.
"""
import os
import json
import sqlite3
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple
import requests
from textblob import TextBlob
# Configuration
WORKSPACE_DIR = Path(os.getenv("WORKSPACE_DIR", "/workspace"))
DB_PATH = WORKSPACE_DIR / "patient_sessions.db"
# Model endpoint - ServiceManager injects MODEL_URL automatically
MODEL_URL = os.getenv("MODEL_URL") or os.getenv("OLLAMA_BASE_URL", "")
MODEL_NAME = os.getenv("OLLAMA_MODEL", "TinyLlama/TinyLlama-1.1B-Chat-v1.0")
def init_db():
"""Initialize SQLite database with schema."""
WORKSPACE_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
# Patient sessions table
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_active TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_interactions INTEGER DEFAULT 0
)
"""
)
# Conversation history table
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
role TEXT,
message TEXT,
sentiment_score REAL,
sentiment_label TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(session_id)
)
"""
)
# Topics mentioned table
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS topics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
topic TEXT,
frequency INTEGER DEFAULT 1,
first_mentioned TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_mentioned TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions(session_id)
)
"""
)
conn.commit()
conn.close()
def ensure_session_exists(session_id: str):
"""Create session if it doesn't exist."""
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
cursor.execute(
"""
INSERT OR IGNORE INTO sessions (session_id, created_at, last_active, total_interactions)
VALUES (?, ?, ?, 0)
""",
(session_id, datetime.now(), datetime.now()),
)
conn.commit()
conn.close()
def update_session_activity(session_id: str):
"""Update session last_active timestamp and increment interactions."""
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
cursor.execute(
"""
UPDATE sessions
SET last_active = ?,
total_interactions = total_interactions + 1
WHERE session_id = ?
""",
(datetime.now(), session_id),
)
conn.commit()
conn.close()
def analyze_sentiment(text: str) -> Tuple[float, str]:
"""Analyze sentiment using TextBlob (simple, no ML model)."""
blob = TextBlob(text)
polarity = blob.sentiment.polarity # -1.0 to 1.0
if polarity > 0.1:
label = "positive"
elif polarity < -0.1:
label = "negative"
else:
label = "neutral"
return polarity, label
def add_conversation(session_id: str, role: str, message: str, sentiment: float):
"""Store conversation message with sentiment."""
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
_, label = analyze_sentiment(message) if sentiment != 0.0 else (0.0, "neutral")
cursor.execute(
"""
INSERT INTO conversations (session_id, role, message, sentiment_score, sentiment_label)
VALUES (?, ?, ?, ?, ?)
""",
(session_id, role, message, sentiment, label),
)
conn.commit()
conn.close()
def get_conversation_history(session_id: str, limit: int = 10) -> List[Tuple]:
"""Retrieve recent conversation history in chronological order."""
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
cursor.execute(
"""
SELECT role, message, sentiment_label, timestamp
FROM conversations
WHERE session_id = ?
ORDER BY timestamp DESC
LIMIT ?
""",
(session_id, limit),
)
rows = cursor.fetchall()
conn.close()
return list(reversed(rows)) # Return in chronological order
def get_session_stats(session_id: str) -> Dict:
"""Calculate session statistics and emotional trends."""
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()
# Basic session info
cursor.execute(
"""
SELECT created_at, last_active, total_interactions
FROM sessions
WHERE session_id = ?
""",
(session_id,),
)
session_row = cursor.fetchone()
if not session_row:
conn.close()
return {"error": "Session not found"}
created_at, last_active, total_interactions = session_row
# Sentiment statistics
cursor.execute(
"""
SELECT AVG(sentiment_score), COUNT(*)
FROM conversations
WHERE session_id = ? AND role = 'patient'
""",
(session_id,),
)
avg_sentiment, message_count = cursor.fetchone()
# Sentiment trend (first 3 vs last 3 messages)
cursor.execute(
"""
SELECT sentiment_score
FROM conversations
WHERE session_id = ? AND role = 'patient'
ORDER BY timestamp
""",
(session_id,),
)
all_sentiments = [row[0] for row in cursor.fetchall()]
trend = "stable"
if len(all_sentiments) >= 6:
early_avg = sum(all_sentiments[:3]) / 3
recent_avg = sum(all_sentiments[-3:]) / 3
if recent_avg > early_avg + 0.2:
trend = "improving"
elif recent_avg < early_avg - 0.2:
trend = "declining"
conn.close()
return {
"session_id": session_id,
"created_at": created_at,
"last_active": last_active,
"total_interactions": total_interactions,
"message_count": message_count or 0,
"avg_sentiment": round(avg_sentiment or 0.0, 2),
"emotional_trend": trend,
}
def generate_response(
patient_message: str, session_id: str, history: List[Tuple], stats: Dict
) -> str:
"""Generate empathetic therapist response using Ollama."""
# Build context from conversation history
history_text = ""
if history:
for role, msg, sentiment, _ in history[-5:]: # Last 5 exchanges
history_text += f"{role.capitalize()}: {msg}\n"
# Build system prompt with context
system_prompt = f"""You are a compassionate, professional therapist. You remember your patients' history and reference it when relevant.
Session Information:
- Total interactions: {stats.get('total_interactions', 0)}
- Average emotional state: {stats.get('avg_sentiment', 0):.2f} (-1=negative, +1=positive)
- Emotional trend: {stats.get('emotional_trend', 'unknown')}
Recent conversation history:
{history_text if history_text else '(No previous conversation)'}
Patient now says: {patient_message}
Instructions:
1. Respond with empathy and understanding
2. If relevant, reference their previous concerns or topics
3. Ask thoughtful follow-up questions when appropriate
4. Keep responses concise (2-3 sentences)
5. Be warm but professional
Your response:"""
try:
# Call Ollama's OpenAI-compatible endpoint
response = requests.post(
f"{MODEL_URL}/v1/chat/completions",
json={
"model": MODEL_NAME,
"messages": [
{"role": "system", "content": "You are a compassionate therapist."},
{"role": "user", "content": system_prompt},
],
"temperature": 0.7,
"max_tokens": 200,
},
timeout=60,
)
response.raise_for_status()
result = response.json()
return result["choices"][0]["message"]["content"].strip()
except Exception as e:
# Fallback response if model fails
return f"I'm here to listen and support you. Could you tell me more about what you're experiencing? (Note: AI model temporarily unavailable: {str(e)})"
def show_progress(session_id: str) -> Dict:
"""Generate progress report for the session."""
stats = get_session_stats(session_id)
history = get_conversation_history(session_id, limit=100)
# Analyze sentiment progression
patient_messages = [
(msg, sentiment) for role, msg, sentiment, _ in history if role == "patient"
]
sentiment_progression = []
for i, (msg, sentiment) in enumerate(patient_messages, 1):
sentiment_progression.append(
{
"message_num": i,
"preview": msg[:50] + "..." if len(msg) > 50 else msg,
"sentiment": sentiment,
}
)
return {
"status": "success",
"session_summary": stats,
"sentiment_progression": sentiment_progression,
"conversation_count": len(history),
"persistence_location": str(DB_PATH),
}
def handler(input_data: Dict) -> Dict:
"""Main handler function for Orpheus."""
# Initialize database
init_db()
# Get session_id (Orpheus extracts from X-Session-ID header)
session_id = input_data.get("session_id", "default-session")
# Ensure session exists
ensure_session_exists(session_id)
# Handle different actions
action = input_data.get("action", "chat")
if action == "show_progress":
return show_progress(session_id)
# Default action: chat
patient_message = input_data.get("message", "")
if not patient_message:
return {
"status": "error",
"error": "No message provided. Include 'message' field in request.",
}
# Analyze patient message sentiment
sentiment_score, sentiment_label = analyze_sentiment(patient_message)
# Store patient message
add_conversation(session_id, "patient", patient_message, sentiment_score)
# Update session activity
update_session_activity(session_id)
# Get conversation history and stats
history = get_conversation_history(session_id)
stats = get_session_stats(session_id)
# Generate therapist response
therapist_response = generate_response(patient_message, session_id, history, stats)
# Store therapist response (neutral sentiment)
add_conversation(session_id, "therapist", therapist_response, 0.0)
# Return structured response
return {
"status": "success",
"response": therapist_response,
"patient_sentiment": {
"score": round(sentiment_score, 2),
"label": sentiment_label,
},
"session_stats": stats,
"persistence_proof": f"State stored in {DB_PATH} (survives container restarts)",
"session_affinity": f"Routing via X-Session-ID: {session_id}",
}
if __name__ == "__main__":
# Local testing
test_input = {
"session_id": "test-session",
"message": "I've been feeling really anxious about work lately",
}
result = handler(test_input)
print(json.dumps(result, indent=2))

