orchestration fixes

This commit is contained in:
Storme-bit
2026-04-10 04:31:51 -07:00
parent 7e8d71c877
commit 1f0d9acea8
3 changed files with 131 additions and 1 deletions

View File

@@ -31,6 +31,7 @@ export async function sendMessage(sessionId, message, model) {
// onChunk(text) called for each token // onChunk(text) called for each token
// onDone({ model, tokenCount }) called when stream closes // onDone({ model, tokenCount }) called when stream closes
// returns an abort function — call it to cancel mid-stream // returns an abort function — call it to cancel mid-stream
/*
export function streamMessage(sessionId, message, model, { onChunk, onDone, onError }) { export function streamMessage(sessionId, message, model, { onChunk, onDone, onError }) {
const controller = new AbortController(); const controller = new AbortController();
@@ -80,7 +81,63 @@ export function streamMessage(sessionId, message, model, { onChunk, onDone, onEr
return () => controller.abort(); return () => controller.abort();
} }
*/
export function streamMessage(sessionId, message, model, { onChunk, onDone, onError }) {
const controller = new AbortController();
(async () => {
try {
const res = await fetch(`${BASE_URL}/chat/stream`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ sessionId, message, model }),
signal: controller.signal,
});
if (!res.ok) throw new Error(`Stream request failed: ${res.status}`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split('\n\n');
buffer = events.pop() || '';
for (const event of events) {
const lines = event.split('\n');
const dataLines = lines
.filter(line => line.startsWith('data: '))
.map(line => line.slice(6));
if (dataLines.length === 0) continue;
const raw = dataLines.join('\n').trim();
if (raw === '[DONE]') continue;
try {
const data = JSON.parse(raw);
if (data.text) onChunk(data.text);
if (data.done) onDone({ model: data.model ?? model, tokenCount: data.tokenCount ?? 0 });
if (data.error) onError(new Error(data.error));
} catch (err) {
console.error('[chat-client] Failed to parse SSE event:', raw, err);
}
}
}
} catch (err) {
if (err.name !== 'AbortError') onError(err);
}
})();
return () => controller.abort();
}
export async function fetchModels() { export async function fetchModels() {
const res = await fetch(`${BASE_URL}/models`); const res = await fetch(`${BASE_URL}/models`);
if(!res.ok) throw new Error(`Failted to fetch models: ${res.status}`); if(!res.ok) throw new Error(`Failted to fetch models: ${res.status}`);

View File

@@ -84,6 +84,14 @@ app.post('/episodes', async (req, res) => {
return res.status(400).json({ error: 'sessionId, userMessage and aiResponse are required' }); return res.status(400).json({ error: 'sessionId, userMessage and aiResponse are required' });
} }
const episode = await episodic.createEpisode(sessionId, userMessage, aiResponse, tokenCount, metadata); const episode = await episodic.createEpisode(sessionId, userMessage, aiResponse, tokenCount, metadata);
console.log('[memory] create episode body:', {
sessionId,
userMessageLength: userMessage?.length,
aiResponseLength: aiResponse?.length,
tokenCount
});
res.status(201).json(episode); res.status(201).json(episode);
}); });

View File

@@ -87,7 +87,7 @@ async function chat(externalId, userMessage, options = {}) {
tokenCount: (result.evalCount || 0 ) + (result.promptEvalCount || 0 ), tokenCount: (result.evalCount || 0 ) + (result.promptEvalCount || 0 ),
}; };
} }
/*
async function chatStream(externalId, userMessage, onChunk, options = {} ) { async function chatStream(externalId, userMessage, onChunk, options = {} ) {
// 1. Resolve or create session // 1. Resolve or create session
let session = await memory.getSessionByExternalId(externalId); let session = await memory.getSessionByExternalId(externalId);
@@ -148,5 +148,70 @@ async function chatStream(externalId, userMessage, onChunk, options = {} ) {
} }
return {model, tokenCount}; return {model, tokenCount};
} }
*/
async function chatStream(externalId, userMessage, onChunk, options = {}) {
let session = await memory.getSessionByExternalId(externalId);
if (!session) session = await memory.createSession(externalId);
const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT);
const recentIds = new Set(recentEpisodes.map(e => e.id));
const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds);
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage);
const res = await inference.completeStream(prompt, options);
let fullText = '';
let model = '';
let tokenCount = 0;
let buffer = '';
for await (const chunk of res.body) {
buffer += Buffer.from(chunk).toString('utf8');
const events = buffer.split('\n\n');
buffer = events.pop() || '';
for (const event of events) {
const lines = event.split('\n');
const dataLines = lines
.filter(line => line.startsWith('data: '))
.map(line => line.slice(6));
if (dataLines.length === 0) continue;
const raw = dataLines.join('\n').trim();
if (raw === '[DONE]') continue;
try {
const data = JSON.parse(raw);
if (data.response) {
fullText += data.response;
onChunk(data.response);
}
if (data.model) model = data.model;
if (data.done && data.tokenCount !== undefined) {
tokenCount = data.tokenCount;
}
if (data.error) {
throw new Error(data.error);
}
} catch (err) {
console.error('[orchestration] Failed to parse inference SSE event:', raw, err.message);
}
}
}
console.log('[orchestration] final streamed text length:', fullText.length);
if (fullText.trim()) {
await memory.createEpisode(session.id, userMessage, fullText, tokenCount);
} else {
console.warn('[orchestration] Stream finished with no assistant text; episode not saved');
}
return { model, tokenCount };
}
module.exports = { chat, chatStream }; module.exports = { chat, chatStream };