From 1f0d9acea800b9db6df4d5c58461e699aa0b6ce6 Mon Sep 17 00:00:00 2001 From: Storme-bit Date: Fri, 10 Apr 2026 04:31:51 -0700 Subject: [PATCH] orchestration fixes --- packages/chat-client/src/api/orchestration.js | 57 ++++++++++++++++ packages/memory-service/src/index.js | 8 +++ .../orchestration-service/src/chat/index.js | 67 ++++++++++++++++++- 3 files changed, 131 insertions(+), 1 deletion(-) diff --git a/packages/chat-client/src/api/orchestration.js b/packages/chat-client/src/api/orchestration.js index 60a9ee3..5f3674c 100644 --- a/packages/chat-client/src/api/orchestration.js +++ b/packages/chat-client/src/api/orchestration.js @@ -31,6 +31,7 @@ export async function sendMessage(sessionId, message, model) { // onChunk(text) called for each token // onDone({ model, tokenCount }) called when stream closes // returns an abort function — call it to cancel mid-stream +/* export function streamMessage(sessionId, message, model, { onChunk, onDone, onError }) { const controller = new AbortController(); @@ -80,7 +81,63 @@ export function streamMessage(sessionId, message, model, { onChunk, onDone, onEr return () => controller.abort(); } +*/ +export function streamMessage(sessionId, message, model, { onChunk, onDone, onError }) { + const controller = new AbortController(); + (async () => { + try { + const res = await fetch(`${BASE_URL}/chat/stream`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ sessionId, message, model }), + signal: controller.signal, + }); + + if (!res.ok) throw new Error(`Stream request failed: ${res.status}`); + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + const events = buffer.split('\n\n'); + buffer = events.pop() || ''; + + for (const event of events) { + const lines = event.split('\n'); + const dataLines = lines + .filter(line => line.startsWith('data: ')) + .map(line => line.slice(6)); + + if (dataLines.length === 0) continue; + + const raw = dataLines.join('\n').trim(); + if (raw === '[DONE]') continue; + + try { + const data = JSON.parse(raw); + + if (data.text) onChunk(data.text); + if (data.done) onDone({ model: data.model ?? model, tokenCount: data.tokenCount ?? 0 }); + if (data.error) onError(new Error(data.error)); + } catch (err) { + console.error('[chat-client] Failed to parse SSE event:', raw, err); + } + } + } + } catch (err) { + if (err.name !== 'AbortError') onError(err); + } + })(); + + return () => controller.abort(); +} export async function fetchModels() { const res = await fetch(`${BASE_URL}/models`); if(!res.ok) throw new Error(`Failted to fetch models: ${res.status}`); diff --git a/packages/memory-service/src/index.js b/packages/memory-service/src/index.js index 09796d3..fdfe90f 100644 --- a/packages/memory-service/src/index.js +++ b/packages/memory-service/src/index.js @@ -84,6 +84,14 @@ app.post('/episodes', async (req, res) => { return res.status(400).json({ error: 'sessionId, userMessage and aiResponse are required' }); } const episode = await episodic.createEpisode(sessionId, userMessage, aiResponse, tokenCount, metadata); + + console.log('[memory] create episode body:', { + sessionId, + userMessageLength: userMessage?.length, + aiResponseLength: aiResponse?.length, + tokenCount + }); + res.status(201).json(episode); }); diff --git a/packages/orchestration-service/src/chat/index.js b/packages/orchestration-service/src/chat/index.js index 0bc8526..b0e035a 100644 --- a/packages/orchestration-service/src/chat/index.js +++ b/packages/orchestration-service/src/chat/index.js @@ -87,7 +87,7 @@ async function chat(externalId, userMessage, options = {}) { tokenCount: (result.evalCount || 0 ) + (result.promptEvalCount || 0 ), }; } - +/* async function chatStream(externalId, userMessage, onChunk, options = {} ) { // 1. Resolve or create session let session = await memory.getSessionByExternalId(externalId); @@ -148,5 +148,70 @@ async function chatStream(externalId, userMessage, onChunk, options = {} ) { } return {model, tokenCount}; } +*/ +async function chatStream(externalId, userMessage, onChunk, options = {}) { + let session = await memory.getSessionByExternalId(externalId); + if (!session) session = await memory.createSession(externalId); + const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT); + const recentIds = new Set(recentEpisodes.map(e => e.id)); + const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds); + + const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage); + const res = await inference.completeStream(prompt, options); + + let fullText = ''; + let model = ''; + let tokenCount = 0; + let buffer = ''; + + for await (const chunk of res.body) { + buffer += Buffer.from(chunk).toString('utf8'); + + const events = buffer.split('\n\n'); + buffer = events.pop() || ''; + + for (const event of events) { + const lines = event.split('\n'); + const dataLines = lines + .filter(line => line.startsWith('data: ')) + .map(line => line.slice(6)); + + if (dataLines.length === 0) continue; + + const raw = dataLines.join('\n').trim(); + if (raw === '[DONE]') continue; + + try { + const data = JSON.parse(raw); + + if (data.response) { + fullText += data.response; + onChunk(data.response); + } + + if (data.model) model = data.model; + if (data.done && data.tokenCount !== undefined) { + tokenCount = data.tokenCount; + } + + if (data.error) { + throw new Error(data.error); + } + } catch (err) { + console.error('[orchestration] Failed to parse inference SSE event:', raw, err.message); + } + } + } + + console.log('[orchestration] final streamed text length:', fullText.length); + + if (fullText.trim()) { + await memory.createEpisode(session.id, userMessage, fullText, tokenCount); + } else { + console.warn('[orchestration] Stream finished with no assistant text; episode not saved'); + } + + return { model, tokenCount }; +} module.exports = { chat, chatStream }; \ No newline at end of file