diff --git a/packages/orchestration-service/src/chat/index.js b/packages/orchestration-service/src/chat/index.js index 829d37a..3f7ef2e 100644 --- a/packages/orchestration-service/src/chat/index.js +++ b/packages/orchestration-service/src/chat/index.js @@ -93,4 +93,60 @@ async function chat(externalId, userMessage, options = {}) { }; } -module.exports = { chat }; \ No newline at end of file +async function chatStream(externalId, userMessage, onChunk, options = {} ) { + // 1. Resolve or create session + let session = await memory.getSessionByExternalId(externalId); + if (!session) session = await memory.createSession(externalId); + + // 2. Context assembly + const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT); + const recentIds = new Set(recentEpisodes.map(e => e.id)); + const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds) + + // 3. Assemble Prompt + const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage); + + // 4. Open stream to inference service + const res = await inference.completeStream(prompt, options); + + let fullText = ''; + let model = ''; + let tokenCount = 0; + + // 5. Parse SSE chunks + for await (const chunk of res.body){ + const lines = chunk.toString().split('\n'); + + for (const line of lines) { + if (!line.startsWith('data: ')) continue; + const raw = line.slice(6).trim(); + if (raw === '[DONE]') continue //stream closed sentinel + + try { + const data = JSON.parse(raw); + if (data.model) model = data.model + + if (data.response) { + fullText += data.response; + onChunk(data.response); + } + + if (data.done && data.eval_count !== undefined) { + tokenCount = (data.eval_count || 0) + (data.prompt_eval_count || 0) + } + } catch { + //partial chunk + //skip and wait for next + } + + } + } + + // 6. Write Complete episode to memory + memory.createEpisode(session.id, userMessage, fullText, tokenCount) + .catch(err => console.error('[orchestration] Failed to save streamed episode:', err.message)) + + return {model, tokenCount}; +} + +module.exports = { chat, chatStream }; \ No newline at end of file diff --git a/packages/orchestration-service/src/routes/chat.js b/packages/orchestration-service/src/routes/chat.js index a66278f..fc0e676 100644 --- a/packages/orchestration-service/src/routes/chat.js +++ b/packages/orchestration-service/src/routes/chat.js @@ -1,5 +1,5 @@ const { Router } = require('express') -const { chat } = require('../chat/index'); +const { chat, chatStream } = require('../chat/index'); const memory = require('../services/memory') const router = Router(); @@ -22,4 +22,29 @@ router.post('/', async (req, res) => { } }); +router.post('/stream', async (req, res) => { + const {sessionId, message} = req.body; + if(!sessionId || !message) { + return res.status(400).json({ + error: 'sessionId and message are required' + }); + } + + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.flushHeaders(); + + try { + await chatStream(sessionId, message, (delta) => { + res.write(`data: ${JSON.stringify({ text: delta})}\n\n`) + }) + res.write(`data: ${JSON.stringify({done: true})}\n\n`); + } catch (err) { + res.write(`data: ${JSON.stringify({error: err.message})}\n\n`); + } finally { + res.end(); + } +}); + module.exports = router; \ No newline at end of file diff --git a/packages/orchestration-service/src/services/inference.js b/packages/orchestration-service/src/services/inference.js index ccb4f44..2363b16 100644 --- a/packages/orchestration-service/src/services/inference.js +++ b/packages/orchestration-service/src/services/inference.js @@ -13,6 +13,17 @@ async function complete(prompt, options ={}) { return res.json(); } +async function completeStream(prompt, options={}) { + const res = await fetch(`${BASE_URL}/complete/stream`, { + method: 'POST', + headers: { 'Content-Type': 'application/json'}, + body: JSON.stringify({prompt, ...options}), + }) + if (!res.ok) throw new Error(`Inference service error: ${res.status}`); + return res; +} + module.exports = { - complete + complete, + completeStream } \ No newline at end of file