added chat streaming
This commit is contained in:
@@ -93,4 +93,60 @@ async function chat(externalId, userMessage, options = {}) {
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = { chat };
|
||||
async function chatStream(externalId, userMessage, onChunk, options = {} ) {
|
||||
// 1. Resolve or create session
|
||||
let session = await memory.getSessionByExternalId(externalId);
|
||||
if (!session) session = await memory.createSession(externalId);
|
||||
|
||||
// 2. Context assembly
|
||||
const recentEpisodes = await memory.getRecentEpisodes(session.id, RECENT_EPISODE_LIMIT);
|
||||
const recentIds = new Set(recentEpisodes.map(e => e.id));
|
||||
const semanticEpisodes = await getSemanticEpisodes(userMessage, session.id, recentIds)
|
||||
|
||||
// 3. Assemble Prompt
|
||||
const prompt = buildPrompt(recentEpisodes, semanticEpisodes, userMessage);
|
||||
|
||||
// 4. Open stream to inference service
|
||||
const res = await inference.completeStream(prompt, options);
|
||||
|
||||
let fullText = '';
|
||||
let model = '';
|
||||
let tokenCount = 0;
|
||||
|
||||
// 5. Parse SSE chunks
|
||||
for await (const chunk of res.body){
|
||||
const lines = chunk.toString().split('\n');
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.startsWith('data: ')) continue;
|
||||
const raw = line.slice(6).trim();
|
||||
if (raw === '[DONE]') continue //stream closed sentinel
|
||||
|
||||
try {
|
||||
const data = JSON.parse(raw);
|
||||
if (data.model) model = data.model
|
||||
|
||||
if (data.response) {
|
||||
fullText += data.response;
|
||||
onChunk(data.response);
|
||||
}
|
||||
|
||||
if (data.done && data.eval_count !== undefined) {
|
||||
tokenCount = (data.eval_count || 0) + (data.prompt_eval_count || 0)
|
||||
}
|
||||
} catch {
|
||||
//partial chunk
|
||||
//skip and wait for next
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// 6. Write Complete episode to memory
|
||||
memory.createEpisode(session.id, userMessage, fullText, tokenCount)
|
||||
.catch(err => console.error('[orchestration] Failed to save streamed episode:', err.message))
|
||||
|
||||
return {model, tokenCount};
|
||||
}
|
||||
|
||||
module.exports = { chat, chatStream };
|
||||
@@ -1,5 +1,5 @@
|
||||
const { Router } = require('express')
|
||||
const { chat } = require('../chat/index');
|
||||
const { chat, chatStream } = require('../chat/index');
|
||||
const memory = require('../services/memory')
|
||||
|
||||
const router = Router();
|
||||
@@ -22,4 +22,29 @@ router.post('/', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
router.post('/stream', async (req, res) => {
|
||||
const {sessionId, message} = req.body;
|
||||
if(!sessionId || !message) {
|
||||
return res.status(400).json({
|
||||
error: 'sessionId and message are required'
|
||||
});
|
||||
}
|
||||
|
||||
res.setHeader('Content-Type', 'text/event-stream');
|
||||
res.setHeader('Cache-Control', 'no-cache');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.flushHeaders();
|
||||
|
||||
try {
|
||||
await chatStream(sessionId, message, (delta) => {
|
||||
res.write(`data: ${JSON.stringify({ text: delta})}\n\n`)
|
||||
})
|
||||
res.write(`data: ${JSON.stringify({done: true})}\n\n`);
|
||||
} catch (err) {
|
||||
res.write(`data: ${JSON.stringify({error: err.message})}\n\n`);
|
||||
} finally {
|
||||
res.end();
|
||||
}
|
||||
});
|
||||
|
||||
module.exports = router;
|
||||
@@ -13,6 +13,17 @@ async function complete(prompt, options ={}) {
|
||||
return res.json();
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
complete
|
||||
async function completeStream(prompt, options={}) {
|
||||
const res = await fetch(`${BASE_URL}/complete/stream`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json'},
|
||||
body: JSON.stringify({prompt, ...options}),
|
||||
})
|
||||
if (!res.ok) throw new Error(`Inference service error: ${res.status}`);
|
||||
return res;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
complete,
|
||||
completeStream
|
||||
}
|
||||
Reference in New Issue
Block a user