Files
nexusAI/packages/orchestration-service/src/chat/index.js
2026-04-26 23:04:31 -07:00

259 lines
8.9 KiB
JavaScript

const memory = require("../services/memory");
const inference = require("../services/inference");
const embedding = require("../services/embedding");
const qdrant = require("../services/qdrant");
const { ORCHESTRATION, logger } = require("@nexusai/shared");
const appSettings = require("../config/settings");
const {triggerSummary} = require('../services/summarization')
function buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage, systemPrompt) {
const parts = [systemPrompt ?? ORCHESTRATION.SYSTEM_PROMPT];
if (entities.length > 0) {
parts.push(
"Here is what you know about entities relevant to this conversation:",
);
for (const e of entities) {
parts.push(`- ${e.name} (${e.type}): ${e.notes}`);
}
parts.push("---");
}
if (semanticEpisodes.length > 0) {
parts.push("Here are some relevant memories from earlier conversations:");
for (const ep of semanticEpisodes) {
parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`);
}
parts.push("---");
}
if (recentEpisodes.length > 0) {
parts.push(`Here are some relevant memories from your past conversations:`);
for (const ep of recentEpisodes) {
parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`);
}
parts.push("--- End of recent memories ---\n");
}
parts.push(`User: ${userMessage}`);
parts.push("Assistant:");
return parts.join("\n");
}
function buildNamingPrompt(userMessage, aiResponse) {
return [
"Your task is to generate a short title for a conversation based on its first exchange.",
"Rules: maximum 5 words, no punctuation, no quotes, plain text only.",
'Examples: "Setting up a Raspberry Pi", "Help with Python list comprehension", "Planning a trip to Japan"',
"",
`User: ${userMessage}`,
`Assistant: ${aiResponse}`,
"",
"Title:",
].join("\n");
}
async function autoNameSession(externalId, userMessage, aiResponse) {
try {
const prompt = buildNamingPrompt(userMessage, aiResponse);
const result = await inference.complete(prompt, {
maxTokens: 20, // title only needs a handful of tokens
temperature: 0.3, // low temperature for consistent, factual naming
});
const name = result.text?.trim().replace(/^["']|["']$/g, ""); // strip any quotes the model adds
if (name) {
await memory.updateSession(externalId, { name });
logger.info(
`[orchestration] Auto-named session "${externalId}": "${name}"`,
);
}
} catch (err) {
logger.warn(
"[orchestration] Auto-naming failed (non-critical):",
err.message,
);
}
}
async function getSemanticEpisodes(
userMessage,
sessionId,
recentIds,
projectSessionIds = null,
{ semanticLimit, scoreThreshold } = {},
) {
try {
const vector = await embedding.embed(userMessage);
const results = await qdrant.searchEpisodes(vector, {
limit: semanticLimit,
scoreThreshold: scoreThreshold,
sessionId: projectSessionIds ? null : sessionId,
projectSessionIds,
});
const fetched = await Promise.all(
results
.filter((r) => !recentIds.has(r.id))
.map((r) => memory.getEpisodeById(r.id)),
);
return fetched.filter(Boolean);
} catch (err) {
logger.warn(
`[orchestration] Semantic search failed, continuing without: `,
err.message,
);
return [];
}
}
async function getRelevantEntities(userMessage, projectId=null) {
try {
const vector = await embedding.embed(userMessage);
const results = await qdrant.searchEntities(vector, { projectId });
logger.info(
"[orchestration] Entity search results:",
results.map((r) => ({ name: r.payload?.name, score: r.score })),
);
return results.map((r) => r.payload).filter(Boolean);
} catch (err) {
logger.debug(
"[orchestration] Entity search failed, continuing without:",
err.message,
);
return [];
}
}
async function assembleContext(externalId, userMessage) {
const settings = appSettings.load();
const { recentEpisodeLimit, semanticLimit, scoreThreshold,
temperature, repeatPenalty, topP, topK, systemPrompt } = settings;
// 1. Resolve or create session
let session = await memory.getSessionByExternalId(externalId);
if (!session) session = await memory.createSession(externalId);
// 2. Resolve project context
let projectSessionIds = null;
let activeSystemPrompt = systemPrompt ?? ORCHESTRATION.SYSTEM_PROMPT;
if (session.project_id) {
try {
const project = await memory.getProject(session.project_id);
if (project) {
const projectSessions = await memory.getProjectSessions(session.project_id);
if (project.system_prompt) activeSystemPrompt = project.system_prompt;
projectSessionIds = projectSessions.map(s => s.id);
}
} catch (err) {
logger.warn('[orchestration] Failed to resolve project context:', err.message);
}
}
// 3. Fetch recent episodes
const recentEpisodes = await memory.getRecentEpisodes(session.id, recentEpisodeLimit);
const isFirstMessage = recentEpisodes.length === 0;
const recentIds = new Set(recentEpisodes.map(e => e.id));
// 4. Semantic + entity search
const semanticEpisodes = await getSemanticEpisodes(
userMessage, session.id, recentIds, projectSessionIds, { semanticLimit, scoreThreshold }
);
const entities = await getRelevantEntities(userMessage, session.project_id ?? null);
// 5. Assemble prompt
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, entities, userMessage, activeSystemPrompt);
return {
session,
prompt,
isFirstMessage,
inferenceOptions: { temperature, repeatPenalty, topP, topK },
};
}
async function chat(externalId, userMessage, options = {}) {
const { session, prompt, isFirstMessage, inferenceOptions } = await assembleContext(externalId, userMessage);
const result = await inference.complete(prompt, { ...options, ...inferenceOptions });
try {
await memory.createEpisode(
session.id, userMessage, result.text,
(result.evalCount || 0) + (result.promptEvalCount || 0),
session.project_id ?? null,
);
} catch (err) {
logger.error('[orchestration] Failed to save episode:', err.message);
}
const allEpisodes = await memory.getRecentEpisodes(session.id, 9999);
triggerSummary(session, allEpisodes);
if (isFirstMessage && !session.name) {
autoNameSession(externalId, userMessage, result.text).catch(() => {});
}
return {
sessionId: externalId,
response: result.text,
model: result.model,
tokenCount: (result.evalCount || 0) + (result.promptEvalCount || 0),
};
}
async function chatStream(externalId, userMessage, onChunk, options = {}) {
try {
const { session, prompt, isFirstMessage, inferenceOptions } = await assembleContext(externalId, userMessage);
const res = await inference.completeStream(prompt, { ...options, ...inferenceOptions });
let fullText = '', model = '', tokenCount = 0, buffer = '';
for await (const chunk of res.body) {
buffer += Buffer.from(chunk).toString('utf8');
const events = buffer.split('\n\n');
buffer = events.pop() || '';
for (const event of events) {
const dataLines = event.split('\n')
.filter(line => line.startsWith('data: '))
.map(line => line.slice(6));
if (!dataLines.length) continue;
const raw = dataLines.join('\n').trim();
if (raw === '[DONE]') continue;
try {
const data = JSON.parse(raw);
if (data.response) { fullText += data.response; onChunk(data.response); }
if (data.model) model = data.model;
if (data.done && data.tokenCount !== undefined) tokenCount = data.tokenCount;
if (data.error) throw new Error(data.error);
} catch (err) {
logger.error('[orchestration] Failed to parse SSE event:', raw, err.message);
}
}
}
if (fullText.trim()) {
await memory.createEpisode(session.id, userMessage, fullText, tokenCount, session.project_id ?? null);
const allEpisodes = await memory.getRecentEpisodes(session.id, 9999);
triggerSummary(session, allEpisodes);
} else {
logger.warn('[orchestration] Stream finished with no assistant text; episode not saved');
}
if (isFirstMessage && !session.name) {
autoNameSession(externalId, userMessage, fullText).catch(() => {});
}
return { model, tokenCount };
} catch (err) {
logger.error('[orchestration] chatStream fatal error:', err.message, err.stack);
throw err;
}
}
module.exports = { chat, chatStream };