docs updated
This commit is contained in:
484
intake/intake.py
484
intake/intake.py
@@ -1,430 +1,160 @@
|
||||
from fastapi import FastAPI, Body, Query, BackgroundTasks
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
from uuid import uuid4
|
||||
import requests
|
||||
import os
|
||||
import sys
|
||||
import asyncio
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 🔧 Load environment variables
|
||||
# ───────────────────────────────────────────────
|
||||
load_dotenv()
|
||||
|
||||
# ─────────────────────────────
|
||||
# Config
|
||||
# ─────────────────────────────
|
||||
SUMMARY_MODEL = os.getenv("SUMMARY_MODEL_NAME", "mistral-7b-instruct-v0.2.Q4_K_M.gguf")
|
||||
SUMMARY_URL = os.getenv("SUMMARY_API_URL", "http://localhost:8080/v1/completions")
|
||||
SUMMARY_MAX_TOKENS = int(os.getenv("SUMMARY_MAX_TOKENS", "200"))
|
||||
SUMMARY_TEMPERATURE = float(os.getenv("SUMMARY_TEMPERATURE", "0.3"))
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 🧠 NeoMem connection (session-aware)
|
||||
# ───────────────────────────────────────────────
|
||||
from uuid import uuid4
|
||||
|
||||
NEOMEM_API = os.getenv("NEOMEM_API")
|
||||
NEOMEM_KEY = os.getenv("NEOMEM_KEY")
|
||||
|
||||
def push_summary_to_neomem(summary_text: str, level: str, session_id: str):
|
||||
"""Send summarized text to NeoMem, tagged by session_id."""
|
||||
if not NEOMEM_API:
|
||||
print("⚠️ NEOMEM_API not set, skipping NeoMem push")
|
||||
return
|
||||
# ─────────────────────────────
|
||||
# App + session buffer
|
||||
# ─────────────────────────────
|
||||
app = FastAPI()
|
||||
SESSIONS = {}
|
||||
|
||||
payload = {
|
||||
"messages": [
|
||||
{"role": "assistant", "content": summary_text}
|
||||
],
|
||||
"user_id": "brian",
|
||||
# optional: uncomment if you want sessions tracked in NeoMem natively
|
||||
# "run_id": session_id,
|
||||
"metadata": {
|
||||
"source": "intake",
|
||||
"type": "summary",
|
||||
"level": level,
|
||||
"session_id": session_id,
|
||||
"cortex": {}
|
||||
}
|
||||
}
|
||||
@app.on_event("startup")
|
||||
def banner():
|
||||
print("🧩 Intake v0.2 booting...")
|
||||
print(f" Model: {SUMMARY_MODEL}")
|
||||
print(f" API: {SUMMARY_URL}")
|
||||
sys.stdout.flush()
|
||||
|
||||
# ─────────────────────────────
|
||||
# Helper: summarize exchanges
|
||||
# ─────────────────────────────
|
||||
def llm(prompt: str):
|
||||
try:
|
||||
resp = requests.post(
|
||||
SUMMARY_URL,
|
||||
json={
|
||||
"model": SUMMARY_MODEL,
|
||||
"prompt": prompt,
|
||||
"max_tokens": SUMMARY_MAX_TOKENS,
|
||||
"temperature": SUMMARY_TEMPERATURE,
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("choices", [{}])[0].get("text", "").strip()
|
||||
except Exception as e:
|
||||
return f"[Error summarizing: {e}]"
|
||||
|
||||
def summarize_simple(exchanges):
|
||||
"""Simple factual summary of recent exchanges."""
|
||||
text = ""
|
||||
for e in exchanges:
|
||||
text += f"User: {e['user_msg']}\nAssistant: {e['assistant_msg']}\n\n"
|
||||
|
||||
prompt = f"""
|
||||
Summarize the following conversation between Brian (user) and Lyra (assistant).
|
||||
Focus only on factual content. Avoid names, examples, story tone, or invented details.
|
||||
|
||||
{text}
|
||||
|
||||
Summary:
|
||||
"""
|
||||
return llm(prompt)
|
||||
|
||||
# ─────────────────────────────
|
||||
# NeoMem push
|
||||
# ─────────────────────────────
|
||||
def push_to_neomem(summary: str, session_id: str):
|
||||
if not NEOMEM_API:
|
||||
return
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if NEOMEM_KEY:
|
||||
headers["Authorization"] = f"Bearer {NEOMEM_KEY}"
|
||||
|
||||
payload = {
|
||||
"messages": [{"role": "assistant", "content": summary}],
|
||||
"user_id": "brian",
|
||||
"metadata": {
|
||||
"source": "intake",
|
||||
"session_id": session_id
|
||||
}
|
||||
}
|
||||
|
||||
try:
|
||||
r = requests.post(f"{NEOMEM_API}/memories", json=payload, headers=headers, timeout=25)
|
||||
r.raise_for_status()
|
||||
print(f"🧠 NeoMem updated ({level}, {session_id}, {len(summary_text)} chars)")
|
||||
requests.post(
|
||||
f"{NEOMEM_API}/memories",
|
||||
json=payload,
|
||||
headers=headers,
|
||||
timeout=20
|
||||
).raise_for_status()
|
||||
print(f"🧠 NeoMem updated for {session_id}")
|
||||
except Exception as e:
|
||||
print(f"❌ NeoMem push failed ({level}, {session_id}): {e}")
|
||||
print(f"NeoMem push failed: {e}")
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# ⚙️ FastAPI + buffer setup
|
||||
# ───────────────────────────────────────────────
|
||||
app = FastAPI()
|
||||
|
||||
# Multiple rolling buffers keyed by session_id
|
||||
SESSIONS = {}
|
||||
|
||||
|
||||
# Summary trigger points
|
||||
# → low-tier: quick factual recaps
|
||||
# → mid-tier: “Reality Check” reflections
|
||||
# → high-tier: rolling continuity synthesis
|
||||
LEVELS = [1, 2, 5, 10, 20, 30]
|
||||
|
||||
@app.on_event("startup")
|
||||
def show_boot_banner():
|
||||
print("🧩 Intake booting...")
|
||||
print(f" Model: {SUMMARY_MODEL}")
|
||||
print(f" API: {SUMMARY_URL}")
|
||||
print(f" Max tokens: {SUMMARY_MAX_TOKENS}, Temp: {SUMMARY_TEMPERATURE}")
|
||||
sys.stdout.flush()
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 🧠 Hierarchical Summarizer (L10→L20→L30 cascade)
|
||||
# ───────────────────────────────────────────────
|
||||
SUMMARIES_CACHE = {"L10": [], "L20": [], "L30": []}
|
||||
|
||||
def summarize(exchanges, level):
|
||||
"""Hierarchical summarizer: builds local and meta summaries."""
|
||||
# Join exchanges into readable text
|
||||
text = "\n".join(
|
||||
f"User: {e['turns'][0]['content']}\nAssistant: {e['turns'][1]['content']}"
|
||||
for e in exchanges
|
||||
)
|
||||
|
||||
def query_llm(prompt: str):
|
||||
try:
|
||||
resp = requests.post(
|
||||
SUMMARY_URL,
|
||||
json={
|
||||
"model": SUMMARY_MODEL,
|
||||
"prompt": prompt,
|
||||
"max_tokens": SUMMARY_MAX_TOKENS,
|
||||
"temperature": SUMMARY_TEMPERATURE,
|
||||
},
|
||||
timeout=180,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return data.get("choices", [{}])[0].get("text", "").strip()
|
||||
except Exception as e:
|
||||
return f"[Error summarizing: {e}]"
|
||||
|
||||
# ───── L10: local “Reality Check” block ─────
|
||||
if level == 10:
|
||||
prompt = f"""
|
||||
You are Lyra Intake performing a 'Reality Check' for the last {len(exchanges)} exchanges.
|
||||
Summarize this block as one coherent paragraph describing the user’s focus, progress, and tone.
|
||||
Avoid bullet points.
|
||||
|
||||
Exchanges:
|
||||
{text}
|
||||
|
||||
Reality Check Summary:
|
||||
"""
|
||||
summary = query_llm(prompt)
|
||||
SUMMARIES_CACHE["L10"].append(summary)
|
||||
|
||||
# ───── L20: merge L10s ─────
|
||||
elif level == 20:
|
||||
# 1️⃣ create fresh L10 for 11–20
|
||||
l10_prompt = f"""
|
||||
You are Lyra Intake generating a second Reality Check for the most recent {len(exchanges)} exchanges.
|
||||
Summarize them as one paragraph describing what's new or changed since the last block.
|
||||
Avoid bullet points.
|
||||
|
||||
Exchanges:
|
||||
{text}
|
||||
|
||||
Reality Check Summary:
|
||||
"""
|
||||
new_l10 = query_llm(l10_prompt)
|
||||
SUMMARIES_CACHE["L10"].append(new_l10)
|
||||
|
||||
# 2️⃣ merge all L10s into a Session Overview
|
||||
joined_l10s = "\n\n".join(SUMMARIES_CACHE["L10"])
|
||||
l20_prompt = f"""
|
||||
You are Lyra Intake merging multiple 'Reality Checks' into a single Session Overview.
|
||||
Summarize the following Reality Checks into one short paragraph capturing the ongoing goals,
|
||||
patterns, and overall progress.
|
||||
|
||||
Reality Checks:
|
||||
{joined_l10s}
|
||||
|
||||
Session Overview:
|
||||
"""
|
||||
l20_summary = query_llm(l20_prompt)
|
||||
SUMMARIES_CACHE["L20"].append(l20_summary)
|
||||
summary = new_l10 + "\n\n" + l20_summary
|
||||
|
||||
# ───── L30: continuity synthesis ─────
|
||||
elif level == 30:
|
||||
# 1️⃣ create new L10 for 21–30
|
||||
new_l10 = query_llm(f"""
|
||||
You are Lyra Intake creating a new Reality Check for exchanges 21–30.
|
||||
Summarize this block in one cohesive paragraph, describing any shifts in focus or tone.
|
||||
|
||||
Exchanges:
|
||||
{text}
|
||||
|
||||
Reality Check Summary:
|
||||
""")
|
||||
|
||||
SUMMARIES_CACHE["L10"].append(new_l10)
|
||||
|
||||
# 2️⃣ merge all lower levels for continuity
|
||||
joined = "\n\n".join(SUMMARIES_CACHE["L10"] + SUMMARIES_CACHE["L20"])
|
||||
continuity_prompt = f"""
|
||||
You are Lyra Intake performing a 'Continuity Report' — a high-level reflection combining all Reality Checks
|
||||
and Session Overviews so far. Describe how the conversation has evolved, the key insights, and remaining threads.
|
||||
|
||||
Reality Checks and Overviews:
|
||||
{joined}
|
||||
|
||||
Continuity Report:
|
||||
"""
|
||||
l30_summary = query_llm(continuity_prompt)
|
||||
SUMMARIES_CACHE["L30"].append(l30_summary)
|
||||
summary = new_l10 + "\n\n" + l30_summary
|
||||
|
||||
# ───── L1–L5 (standard factual summaries) ─────
|
||||
else:
|
||||
prompt = f"""
|
||||
You are Lyra Intake, a background summarization module for an AI assistant.
|
||||
|
||||
Your job is to compress recent chat exchanges between a user and an assistant
|
||||
into a short, factual summary. The user's name is Brian, and the assistant's name is Lyra.
|
||||
Focus only on the real conversation content.
|
||||
Do NOT invent names, people, or examples. Avoid speculation or storytelling.
|
||||
|
||||
Summarize clearly what topics were discussed and what conclusions were reached.
|
||||
Avoid speculation, names, or bullet points.
|
||||
|
||||
Exchanges:
|
||||
{text}
|
||||
|
||||
Summary:
|
||||
"""
|
||||
summary = query_llm(prompt)
|
||||
|
||||
return f"[L{level} Summary of {len(exchanges)} exchanges]: {summary}"
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
LOG_DIR = "/app/logs"
|
||||
os.makedirs(LOG_DIR, exist_ok=True)
|
||||
|
||||
def log_to_file(level: str, summary: str):
|
||||
"""Append each summary to a persistent .txt log file."""
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
filename = os.path.join(LOG_DIR, "summaries.log")
|
||||
with open(filename, "a", encoding="utf-8") as f:
|
||||
f.write(f"[{timestamp}] {level}\n{summary}\n{'='*60}\n\n")
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 🔁 Background summarization helper
|
||||
# ───────────────────────────────────────────────
|
||||
def run_summarization_task(exchange, session_id):
|
||||
"""Async-friendly wrapper for slow summarization work."""
|
||||
# ─────────────────────────────
|
||||
# Background summarizer
|
||||
# ─────────────────────────────
|
||||
def bg_summarize(session_id: str):
|
||||
try:
|
||||
hopper = SESSIONS.get(session_id)
|
||||
if not hopper:
|
||||
print(f"⚠️ No hopper found for {session_id}")
|
||||
return
|
||||
|
||||
buffer = hopper["buffer"]
|
||||
count = len(buffer)
|
||||
summaries = {}
|
||||
|
||||
if count < 30:
|
||||
for lvl in LEVELS:
|
||||
if lvl <= count:
|
||||
s_text = summarize(list(buffer)[-lvl:], lvl)
|
||||
log_to_file(f"L{lvl}", s_text)
|
||||
push_summary_to_neomem(s_text, f"L{lvl}", session_id)
|
||||
summaries[f"L{lvl}"] = s_text
|
||||
else:
|
||||
# optional: include your existing 30+ logic here
|
||||
pass
|
||||
|
||||
if summaries:
|
||||
print(f"🧩 [BG] Summaries generated asynchronously at count={count}: {list(summaries.keys())}")
|
||||
buf = list(hopper["buffer"])
|
||||
summary = summarize_simple(buf)
|
||||
push_to_neomem(summary, session_id)
|
||||
|
||||
print(f"🧩 Summary generated for {session_id}")
|
||||
except Exception as e:
|
||||
print(f"💥 [BG] Async summarization failed: {e}")
|
||||
print(f"Summarizer error: {e}")
|
||||
|
||||
# ─────────────────────────────
|
||||
# Routes
|
||||
# ─────────────────────────────
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 📨 Routes
|
||||
# ───────────────────────────────────────────────
|
||||
@app.post("/add_exchange")
|
||||
def add_exchange(exchange: dict = Body(...), background_tasks: BackgroundTasks = None):
|
||||
|
||||
session_id = exchange.get("session_id") or f"sess-{uuid4().hex[:8]}"
|
||||
exchange["session_id"] = session_id
|
||||
exchange["timestamp"] = datetime.now().isoformat()
|
||||
|
||||
if session_id not in SESSIONS:
|
||||
SESSIONS[session_id] = {"buffer": deque(maxlen=100), "last_update": datetime.now()}
|
||||
SESSIONS[session_id] = {
|
||||
"buffer": deque(maxlen=200),
|
||||
"created_at": datetime.now()
|
||||
}
|
||||
print(f"🆕 Hopper created: {session_id}")
|
||||
|
||||
hopper = SESSIONS[session_id]
|
||||
hopper["buffer"].append(exchange)
|
||||
hopper["last_update"] = datetime.now()
|
||||
count = len(hopper["buffer"])
|
||||
SESSIONS[session_id]["buffer"].append(exchange)
|
||||
|
||||
# 🚀 queue background summarization
|
||||
if background_tasks:
|
||||
background_tasks.add_task(run_summarization_task, exchange, session_id)
|
||||
print(f"⏩ Queued async summarization for {session_id}")
|
||||
background_tasks.add_task(bg_summarize, session_id)
|
||||
print(f"⏩ Summarization queued for {session_id}")
|
||||
|
||||
return {"ok": True, "exchange_count": count, "queued": True}
|
||||
return {"ok": True, "session_id": session_id}
|
||||
|
||||
|
||||
# # ── Normal tiered behavior up to 30 ── commented out for aysnc addon
|
||||
# if count < 30:
|
||||
# if count in LEVELS:
|
||||
# for lvl in LEVELS:
|
||||
# if lvl <= count:
|
||||
# summaries[f"L{lvl}"] = summarize(list(buffer)[-lvl:], lvl)
|
||||
# log_to_file(f"L{lvl}", summaries[f"L{lvl}"])
|
||||
# push_summary_to_neomem(summaries[f"L{lvl}"], f"L{lvl}", session_id)
|
||||
|
||||
# # 🚀 Launch summarization in the background (non-blocking)
|
||||
# if background_tasks:
|
||||
# background_tasks.add_task(run_summarization_task, exchange, session_id)
|
||||
# print(f"⏩ Queued async summarization for {session_id}")
|
||||
|
||||
|
||||
# # ── Beyond 30: keep summarizing every +15 exchanges ──
|
||||
# else:
|
||||
# # Find next milestone after 30 (45, 60, 75, ...)
|
||||
# milestone = 30 + ((count - 30) // 15) * 15
|
||||
# if count == milestone:
|
||||
# summaries[f"L{milestone}"] = summarize(list(buffer)[-15:], milestone)
|
||||
# log_to_file(f"L{milestone}", summaries[f"L{milestone}"])
|
||||
# push_summary_to_neomem(summaries[f"L{milestone}"], f"L{milestone}", session_id)
|
||||
|
||||
# # Optional: merge all continuity summaries so far into a running meta-summary
|
||||
# joined = "\n\n".join(
|
||||
# [s for key, s in summaries.items() if key.startswith("L")]
|
||||
# )
|
||||
# meta_prompt = f"""
|
||||
# You are Lyra Intake composing an 'Ongoing Continuity Report' that merges
|
||||
# all prior continuity summaries into one living narrative.
|
||||
# Focus on major themes, changes, and lessons so far.
|
||||
|
||||
# Continuity Summaries:
|
||||
# {joined}
|
||||
|
||||
# Ongoing Continuity Report:
|
||||
# """
|
||||
# meta_summary = f"[L∞ Ongoing Continuity Report]: {query_llm(meta_prompt)}"
|
||||
# summaries["L∞"] = meta_summary
|
||||
# log_to_file("L∞", meta_summary)
|
||||
# push_summary_to_neomem(meta_summary, "L∞", session_id)
|
||||
|
||||
# print(f"🌀 L{milestone} continuity summary created (messages {count-14}-{count})")
|
||||
|
||||
# # ── Log summaries ──
|
||||
# if summaries:
|
||||
# print(f"🧩 Summaries generated at count={count}: {list(summaries.keys())}")
|
||||
|
||||
# return {
|
||||
# "ok": True,
|
||||
# "exchange_count": len(buffer),
|
||||
# "queued": True
|
||||
# }
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# Clear rubbish from hopper.
|
||||
# ───────────────────────────────────────────────
|
||||
def close_session(session_id: str):
|
||||
"""Run a final summary for the given hopper, post it to NeoMem, then delete it."""
|
||||
hopper = SESSIONS.get(session_id)
|
||||
if not hopper:
|
||||
print(f"⚠️ No active hopper for {session_id}")
|
||||
return
|
||||
|
||||
buffer = hopper["buffer"]
|
||||
if not buffer:
|
||||
print(f"⚠️ Hopper {session_id} is empty, skipping closure")
|
||||
del SESSIONS[session_id]
|
||||
return
|
||||
|
||||
try:
|
||||
print(f"🔒 Closing hopper {session_id} ({len(buffer)} exchanges)")
|
||||
|
||||
# Summarize everything left in the buffer
|
||||
final_summary = summarize(list(buffer), 30) # level 30 = continuity synthesis
|
||||
log_to_file("LFinal", final_summary)
|
||||
push_summary_to_neomem(final_summary, "LFinal", session_id)
|
||||
|
||||
# Optionally: mark this as a special 'closure' memory
|
||||
closure_note = f"[Session {session_id} closed with {len(buffer)} exchanges]"
|
||||
push_summary_to_neomem(closure_note, "LFinalNote", session_id)
|
||||
|
||||
print(f"🧹 Hopper {session_id} closed and deleted")
|
||||
except Exception as e:
|
||||
print(f"💥 Error closing hopper {session_id}: {e}")
|
||||
finally:
|
||||
del SESSIONS[session_id]
|
||||
|
||||
@app.post("/close_session/{session_id}")
|
||||
def close_session_endpoint(session_id: str):
|
||||
close_session(session_id)
|
||||
def close_session(session_id: str):
|
||||
if session_id in SESSIONS:
|
||||
del SESSIONS[session_id]
|
||||
return {"ok": True, "closed": session_id}
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# 🧾 Provide recent summary for Cortex /reason calls
|
||||
# ───────────────────────────────────────────────
|
||||
@app.get("/summaries")
|
||||
def get_summary(session_id: str = Query(..., description="Active session ID")):
|
||||
"""
|
||||
Return the most recent summary (L10→L30→LFinal) for a given session.
|
||||
If none exist yet, return a placeholder summary.
|
||||
"""
|
||||
try:
|
||||
# Find the most recent file entry in summaries.log
|
||||
log_path = os.path.join(LOG_DIR, "summaries.log")
|
||||
if not os.path.exists(log_path):
|
||||
return {
|
||||
"summary_text": "(none)",
|
||||
"last_message_ts": datetime.now().isoformat(),
|
||||
"session_id": session_id,
|
||||
"exchange_count": 0,
|
||||
}
|
||||
def get_summary(session_id: str = Query(...)):
|
||||
hopper = SESSIONS.get(session_id)
|
||||
if not hopper:
|
||||
return {"summary_text": "(none)", "session_id": session_id}
|
||||
|
||||
with open(log_path, "r", encoding="utf-8") as f:
|
||||
lines = f.readlines()
|
||||
summary = summarize_simple(list(hopper["buffer"]))
|
||||
return {"summary_text": summary, "session_id": session_id}
|
||||
|
||||
# Grab the last summary section that mentions this session_id
|
||||
recent_lines = [ln for ln in lines if session_id in ln or ln.startswith("[L")]
|
||||
if recent_lines:
|
||||
# Find the last non-empty summary text
|
||||
snippet = "".join(recent_lines[-8:]).strip()
|
||||
else:
|
||||
snippet = "(no summaries yet)"
|
||||
|
||||
return {
|
||||
"summary_text": snippet[-1000:], # truncate to avoid huge block
|
||||
"last_message_ts": datetime.now().isoformat(),
|
||||
"session_id": session_id,
|
||||
"exchange_count": len(SESSIONS.get(session_id, {}).get("buffer", [])),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ /summaries failed for {session_id}: {e}")
|
||||
return {
|
||||
"summary_text": f"(error fetching summaries: {e})",
|
||||
"last_message_ts": datetime.now().isoformat(),
|
||||
"session_id": session_id,
|
||||
"exchange_count": 0,
|
||||
}
|
||||
|
||||
# ───────────────────────────────────────────────
|
||||
# ✅ Health check
|
||||
# ───────────────────────────────────────────────
|
||||
@app.get("/health")
|
||||
def health():
|
||||
return {"ok": True, "model": SUMMARY_MODEL, "url": SUMMARY_URL}
|
||||
|
||||
Reference in New Issue
Block a user