Files
nexusAI/packages/orchestration-service/src/chat/index.js
2026-04-10 04:19:14 -07:00

152 lines
5.2 KiB
JavaScript

const memory = require('../services/memory');
const inference = require('../services/inference');
const embedding = require('../services/embedding');
const qdrant = require('../services/qdrant');
const { ORCHESTRATION } = require('@nexusai/shared')
const { RECENT_EPISODE_LIMIT, SEMANTIC_LIMIT, SCORE_THRESHOLD, SYSTEM_PROMPT } = ORCHESTRATION;
function buildPrompt(recentEpisodes, semanticEpisodes, userMessage) {
const parts = [SYSTEM_PROMPT];
if (semanticEpisodes.length > 0 )
{
parts.push('Here are some relevant memories from earlier conversations:')
for (const ep of semanticEpisodes) {
parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`);
}
parts.push('---')
}
if (recentEpisodes.length > 0) {
parts.push(`Here are some relevant memories from your past conversations:`);
for (const ep of recentEpisodes) {
parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`);
}
parts.push('--- End of recent memories ---\n');
}
parts.push(`User: ${userMessage}`);
parts.push('Assistant:');
return parts.join('\n');
}
async function getSemanticEpisodes(userMessage, sessionId, recentIds) {
try {
const vector = await embedding.embed(userMessage);
const results = await qdrant.searchEpisodes( vector, {
limit: SEMANTIC_LIMIT,
scoreThreshold: SCORE_THRESHOLD,
sessionId,
});
const fetched = await Promise.all(
results
.filter(r => !recentIds.has(r.id))
.map(r => memory.getEpisodeById(r.id))
);
return fetched.filter(Boolean);
} catch (err) {
console.warn(`[orchestration] Semantic search failed, continuing without: `, err.message);
return [];
}
}
async function chat(externalId, userMessage, options = {}) {
// 1. Resolve or create session
let session = await memory.getSessionByExternalId(externalId);
if (!session) session = await memory.createSession(externalId);
// 2. Fetch recent episodes for context
const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT );
const recentIds = new Set(recentEpisodes.map(e => e.id));
// 3. Semantic Search
const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds);
// 4. Assemble prompt
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage);
// 5. Run inference
const result = await inference.complete(prompt, options);
// 6. Write episode back to memory
memory.createEpisode(
session.id,
userMessage,
result.text,
(result.evalCount || 0) + (result.promptEvalCount || 0 )
).catch(err => console.error(`[orchestration] Failed to save episode`, err.message));
// 7. Return response
return {
sessionId: externalId,
response: result.text,
model: result.model,
tokenCount: (result.evalCount || 0 ) + (result.promptEvalCount || 0 ),
};
}
async function chatStream(externalId, userMessage, onChunk, options = {} ) {
// 1. Resolve or create session
let session = await memory.getSessionByExternalId(externalId);
if (!session) session = await memory.createSession(externalId);
// 2. Context assembly
const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT);
const recentIds = new Set(recentEpisodes.map(e => e.id));
const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds)
// 3. Assemble Prompt
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage);
// 4. Open stream to inference service
const res = await inference.completeStream(prompt, options);
let fullText = '';
let model = '';
let tokenCount = 0;
// 5. Parse SSE chunks
// Replace the current SSE parsing block in chatStream:
for await (const chunk of res.body) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const raw = line.slice(6).trim();
if (raw === '[DONE]') continue;
try {
const data = JSON.parse(raw);
// llama.cpp provider shape: { response, done }
if (data.response) {
fullText += data.response;
onChunk(data.response);
}
// model comes through on done chunk from inference route
if (data.model) model = data.model;
// token count — inference.js route sends this on the done chunk
if (data.done && data.tokenCount !== undefined) {
tokenCount = data.tokenCount;
}
} catch {
// partial chunk — skip
}
}
}
// 6. Write Complete episode to memory
if(fullText && fullText.trim()){
memory.createEpisode(session.id, userMessage, fullText, tokenCount)
.catch(err => console.error('[orchestration] Failed to save streamed episode:', err.message))
}
return {model, tokenCount};
}
module.exports = { chat, chatStream };