Files
project-lyra/neomem/_archive/old_servers/main_backup.py
2025-11-16 03:17:32 -05:00

263 lines
9.2 KiB
Python

import logging
import os
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse, RedirectResponse
from pydantic import BaseModel, Field
from nvgram import Memory
app = FastAPI(title="NVGRAM", version="0.1.1")
@app.get("/health")
def health():
return {
"status": "ok",
"version": app.version,
"service": app.title
}
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Load environment variables
load_dotenv()
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "postgres")
POSTGRES_PORT = os.environ.get("POSTGRES_PORT", "5432")
POSTGRES_DB = os.environ.get("POSTGRES_DB", "postgres")
POSTGRES_USER = os.environ.get("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "postgres")
POSTGRES_COLLECTION_NAME = os.environ.get("POSTGRES_COLLECTION_NAME", "memories")
NEO4J_URI = os.environ.get("NEO4J_URI", "bolt://neo4j:7687")
NEO4J_USERNAME = os.environ.get("NEO4J_USERNAME", "neo4j")
NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD", "mem0graph")
MEMGRAPH_URI = os.environ.get("MEMGRAPH_URI", "bolt://localhost:7687")
MEMGRAPH_USERNAME = os.environ.get("MEMGRAPH_USERNAME", "memgraph")
MEMGRAPH_PASSWORD = os.environ.get("MEMGRAPH_PASSWORD", "mem0graph")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
HISTORY_DB_PATH = os.environ.get("HISTORY_DB_PATH", "/app/history/history.db")
# Embedder settings (switchable by .env)
EMBEDDER_PROVIDER = os.environ.get("EMBEDDER_PROVIDER", "openai")
EMBEDDER_MODEL = os.environ.get("EMBEDDER_MODEL", "text-embedding-3-small")
OLLAMA_HOST = os.environ.get("OLLAMA_HOST") # only used if provider=ollama
DEFAULT_CONFIG = {
"version": "v1.1",
"vector_store": {
"provider": "pgvector",
"config": {
"host": POSTGRES_HOST,
"port": int(POSTGRES_PORT),
"dbname": POSTGRES_DB,
"user": POSTGRES_USER,
"password": POSTGRES_PASSWORD,
"collection_name": POSTGRES_COLLECTION_NAME,
},
},
"graph_store": {
"provider": "neo4j",
"config": {"url": NEO4J_URI, "username": NEO4J_USERNAME, "password": NEO4J_PASSWORD},
},
"llm": {
"provider": os.getenv("LLM_PROVIDER", "ollama"),
"config": {
"model": os.getenv("LLM_MODEL", "qwen2.5:7b-instruct-q4_K_M"),
"ollama_base_url": os.getenv("LLM_API_BASE") or os.getenv("OLLAMA_BASE_URL"),
"temperature": float(os.getenv("LLM_TEMPERATURE", "0.2")),
},
},
"embedder": {
"provider": EMBEDDER_PROVIDER,
"config": {
"model": EMBEDDER_MODEL,
"embedding_dims": int(os.environ.get("EMBEDDING_DIMS", "1536")),
"openai_base_url": os.getenv("OPENAI_BASE_URL"),
"api_key": OPENAI_API_KEY
},
},
"history_db_path": HISTORY_DB_PATH,
}
import time
print(">>> Embedder config:", DEFAULT_CONFIG["embedder"])
# Wait for Neo4j connection before creating Memory instance
for attempt in range(10): # try for about 50 seconds total
try:
MEMORY_INSTANCE = Memory.from_config(DEFAULT_CONFIG)
print(f"✅ Connected to Neo4j on attempt {attempt + 1}")
break
except Exception as e:
print(f"⏳ Waiting for Neo4j (attempt {attempt + 1}/10): {e}")
time.sleep(5)
else:
raise RuntimeError("❌ Could not connect to Neo4j after 10 attempts")
class Message(BaseModel):
role: str = Field(..., description="Role of the message (user or assistant).")
content: str = Field(..., description="Message content.")
class MemoryCreate(BaseModel):
messages: List[Message] = Field(..., description="List of messages to store.")
user_id: Optional[str] = None
agent_id: Optional[str] = None
run_id: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class SearchRequest(BaseModel):
query: str = Field(..., description="Search query.")
user_id: Optional[str] = None
run_id: Optional[str] = None
agent_id: Optional[str] = None
filters: Optional[Dict[str, Any]] = None
@app.post("/configure", summary="Configure Mem0")
def set_config(config: Dict[str, Any]):
"""Set memory configuration."""
global MEMORY_INSTANCE
MEMORY_INSTANCE = Memory.from_config(config)
return {"message": "Configuration set successfully"}
@app.post("/memories", summary="Create memories")
def add_memory(memory_create: MemoryCreate):
"""Store new memories."""
if not any([memory_create.user_id, memory_create.agent_id, memory_create.run_id]):
raise HTTPException(status_code=400, detail="At least one identifier (user_id, agent_id, run_id) is required.")
params = {k: v for k, v in memory_create.model_dump().items() if v is not None and k != "messages"}
try:
response = MEMORY_INSTANCE.add(messages=[m.model_dump() for m in memory_create.messages], **params)
return JSONResponse(content=response)
except Exception as e:
logging.exception("Error in add_memory:") # This will log the full traceback
raise HTTPException(status_code=500, detail=str(e))
@app.get("/memories", summary="Get memories")
def get_all_memories(
user_id: Optional[str] = None,
run_id: Optional[str] = None,
agent_id: Optional[str] = None,
):
"""Retrieve stored memories."""
if not any([user_id, run_id, agent_id]):
raise HTTPException(status_code=400, detail="At least one identifier is required.")
try:
params = {
k: v for k, v in {"user_id": user_id, "run_id": run_id, "agent_id": agent_id}.items() if v is not None
}
return MEMORY_INSTANCE.get_all(**params)
except Exception as e:
logging.exception("Error in get_all_memories:")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/memories/{memory_id}", summary="Get a memory")
def get_memory(memory_id: str):
"""Retrieve a specific memory by ID."""
try:
return MEMORY_INSTANCE.get(memory_id)
except Exception as e:
logging.exception("Error in get_memory:")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search", summary="Search memories")
def search_memories(search_req: SearchRequest):
"""Search for memories based on a query."""
try:
params = {k: v for k, v in search_req.model_dump().items() if v is not None and k != "query"}
return MEMORY_INSTANCE.search(query=search_req.query, **params)
except Exception as e:
logging.exception("Error in search_memories:")
raise HTTPException(status_code=500, detail=str(e))
@app.put("/memories/{memory_id}", summary="Update a memory")
def update_memory(memory_id: str, updated_memory: Dict[str, Any]):
"""Update an existing memory with new content.
Args:
memory_id (str): ID of the memory to update
updated_memory (str): New content to update the memory with
Returns:
dict: Success message indicating the memory was updated
"""
try:
return MEMORY_INSTANCE.update(memory_id=memory_id, data=updated_memory)
except Exception as e:
logging.exception("Error in update_memory:")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/memories/{memory_id}/history", summary="Get memory history")
def memory_history(memory_id: str):
"""Retrieve memory history."""
try:
return MEMORY_INSTANCE.history(memory_id=memory_id)
except Exception as e:
logging.exception("Error in memory_history:")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/memories/{memory_id}", summary="Delete a memory")
def delete_memory(memory_id: str):
"""Delete a specific memory by ID."""
try:
MEMORY_INSTANCE.delete(memory_id=memory_id)
return {"message": "Memory deleted successfully"}
except Exception as e:
logging.exception("Error in delete_memory:")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/memories", summary="Delete all memories")
def delete_all_memories(
user_id: Optional[str] = None,
run_id: Optional[str] = None,
agent_id: Optional[str] = None,
):
"""Delete all memories for a given identifier."""
if not any([user_id, run_id, agent_id]):
raise HTTPException(status_code=400, detail="At least one identifier is required.")
try:
params = {
k: v for k, v in {"user_id": user_id, "run_id": run_id, "agent_id": agent_id}.items() if v is not None
}
MEMORY_INSTANCE.delete_all(**params)
return {"message": "All relevant memories deleted"}
except Exception as e:
logging.exception("Error in delete_all_memories:")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/reset", summary="Reset all memories")
def reset_memory():
"""Completely reset stored memories."""
try:
MEMORY_INSTANCE.reset()
return {"message": "All memories reset"}
except Exception as e:
logging.exception("Error in reset_memory:")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/", summary="Redirect to the OpenAPI documentation", include_in_schema=False)
def home():
"""Redirect to the OpenAPI documentation."""
return RedirectResponse(url="/docs")