const { getEnv, SERVICES, SUMMARIES } = require('@nexusai/shared'); const EXTRACTION_URL = getEnv('EXTRACTION_URL', 'http://localhost:11434'); const EXTRACTION_MODEL = getEnv('EXTRACTION_MODEL', 'qwen2.5:3b'); const MEMORY_URL = getEnv('MEMORY_SERVICE_URL', SERVICES.MEMORY_URL); const THRESHOLD_TOKENS = parseInt(getEnv('SUMMARY_THRESHOLD_TOKENS', SUMMARIES.THRESHOLD_TOKENS)); const MAX_SUMMARY_TOKENS = parseInt(getEnv('SUMMARY_MAX_TOKENS', SUMMARIES.MAX_SUMMARY_TOKENS)); const MIN_EPISODES_SINCE = parseInt(getEnv('SUMMARY_MIN_EPISODES', SUMMARIES.MIN_EPISODES_SINCE)); function buildSummaryPrompt(episodes, existingSummary = null) { const MAX_CHARS = 3000; let context = episodes .map(ep => `User: ${ep.user_message}\nAssistant: ${ep.ai_response}`) .join('\n\n'); if (context.length > MAX_CHARS) { context = context.slice(-MAX_CHARS); } const instruction = existingSummary ? `Update the summary below to incorporate the new exchanges. Write 3-5 sentences in third person. Do not quote directly — paraphrase only. Do not include greetings, sign-offs, or filler. Output only the updated summary text. Previous summary: ${existingSummary} New exchanges: ${context}` : `Summarize the conversation below in 3-5 sentences. Write in third person. Do not quote directly — paraphrase only. Do not include greetings, sign-offs, or filler. Output only the summary text. Conversation: ${context}`; return [ '<|im_start|>user', // ChatML for qwen2.5 instruction, '<|im_end|>', '<|im_start|>assistant', ].join('\n'); } async function generateSummary(episodes, existingSummary = null) { const prompt = buildSummaryPrompt(episodes, existingSummary); const res = await fetch(`${EXTRACTION_URL}/api/generate`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: EXTRACTION_MODEL, prompt, stream: false, options: { temperature: 0.2, // slightly higher than entities — summaries benefit from some fluency num_predict: 500, // generous but bounded — keeps summaries from running long }, }), }); if (!res.ok) throw new Error(`Ollama responded ${res.status}`); const data = await res.json(); const raw = data.response?.trim() ?? ''; // Strip any leaked ChatML tokens Qwen echoes back const content = raw .replace(/<\|im_start\|>.*?<\|im_end\|>/gs, '') .replace(/<\|im_start\|>|<\|im_end\|>|<\|im_sep\|>/g, '') .trim(); return content; } async function maybeSummarize(session, allEpisodes) { // 1. Sum total tokens for this session const totalTokens = allEpisodes.reduce((sum, ep) => sum + (ep.token_count || 0), 0); if (totalTokens < THRESHOLD_TOKENS) return; // under threshold — nothing to do console.log('[summarization] fetching existing summaries...'); // 2. Fetch existing summaries for session const summariesRes = await fetch(`${MEMORY_URL}/sessions/${session.id}/summaries`); console.log('[summarization] memory URL:', MEMORY_URL); console.log('[summarization] session:', session.id, session.external_id); console.log('[summarization] summaries fetch status:', summariesRes.status); if (!summariesRes.ok) return; const summaries = await summariesRes.json(); const latest = summaries.at(-1) ?? null; const lastCoveredId = latest ? parseInt(latest.episode_range?.split('-').at(-1)) || 0 : 0; // 3. Guard — don't re-summarize until MIN_EPISODES_SINCE new episodes have accumulated if (latest) { const newEpisodes = allEpisodes.filter(ep => ep.id > lastCoveredId); if (newEpisodes.length < MIN_EPISODES_SINCE) return; } // 4. Determine episode range string e.g. "1-42" const ids = allEpisodes.map(ep => ep.id).sort((a,b) => a - b); const episodeRange = `${ids.at(0)}-${ids.at(-1)}`; const totalEpisodeTokens = allEpisodes.reduce((sum, ep) => sum + (ep.token_count || 0), 0); // 5. Generate summary — pass existing content if updating const episodesToSummarize = latest ? allEpisodes.filter(ep => ep.id > lastCoveredId) : allEpisodes; // add temporarily before the generateSummary call console.log('[summarization] episodes to summarize:', episodesToSummarize.length); console.log('[summarization] total chars:', episodesToSummarize.reduce((s, ep) => s + ep.user_message.length + ep.ai_response.length, 0)); const content = await generateSummary( episodesToSummarize, latest && latest.content.length < MAX_SUMMARY_TOKENS ? latest.content : null // if existing summary is already large, treat as fresh rather than appending to a huge blob ); if (!content) return; // 6. Create new row or update existing if (!latest || latest.content.length >= MAX_SUMMARY_TOKENS) { await fetch(`${MEMORY_URL}/summaries`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ sessionId: session.id, content, tokenCount: totalEpisodeTokens, episodeRange, }), }); console.log(`[summarization] Created new summary for session ${session.id}`); } else { await fetch(`${MEMORY_URL}/summaries/${latest.id}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ content, tokenCount: totalEpisodeTokens, episodeRange, }), }); console.log(`[summarization] Updated summary ${latest.id} for session ${session.id}`); } } async function triggerSummary(session, allEpisodes) { // Intentionally fire-and-forget — caller doesn't await this maybeSummarize(session, allEpisodes).catch(err => console.warn('[summarization] Summary failed (non-critical):', err.message) ); } module.exports = { triggerSummary };