From eccda219924e5ab770906d008a85d6e40a7577b7 Mon Sep 17 00:00:00 2001 From: Storme-bit Date: Sun, 5 Apr 2026 05:27:04 -0700 Subject: [PATCH] Added orchestration layer --- .../orchestration-service/src/chat/index.js | 63 +++++++++++++++++++ packages/orchestration-service/src/index.js | 16 ++--- .../orchestration-service/src/routes/chat.js | 24 +++++++ .../src/services/inference.js | 18 ++++++ .../src/services/memory.js | 48 ++++++++++++++ 5 files changed, 161 insertions(+), 8 deletions(-) create mode 100644 packages/orchestration-service/src/chat/index.js create mode 100644 packages/orchestration-service/src/routes/chat.js create mode 100644 packages/orchestration-service/src/services/inference.js create mode 100644 packages/orchestration-service/src/services/memory.js diff --git a/packages/orchestration-service/src/chat/index.js b/packages/orchestration-service/src/chat/index.js new file mode 100644 index 0000000..6458f7f --- /dev/null +++ b/packages/orchestration-service/src/chat/index.js @@ -0,0 +1,63 @@ +const memory = require('./memory'); +const inference = require('./inference'); + +const SYSTEM_PROMPT = `You are a helpful, context-aware AI assistant. +You have access to memories of past conversations with the user. +Use them to provide consistent, personalised responses.`; + +const RECENT_EPISODE_LIMIT = 10; // Number of recent episodes to retrieve for context + +function buildPrompt(getRecentEpisodes, userMessage) { + const parts = [SYSTEM_PROMPT]; + + if (getRecentEpisodes.length > 0) { + parts.push(`Here are some relevant memories from your past conversations:`); + for (const ep of recentEpisodes) { + parts.push(`User: ${ep.user_message}\nAssistant: ${ep.ai_response}`); + } + parts.push('--- End of recent memories ---\n'); + } + + parts.push(`User: ${userMessage}`); + parts.push('Assistant:'); + + return parts.join('\n'); +} + +async function chat(externalId, userMessage, options = {}) { + // 1. Resolve or create session + let session = await memory.getSessionByExternalId(externalId); + if (!session) { + session = await memory.createSession(externalId); + } + + // 2. Fetch recent episodes for context + const recentEpisodes = await memory.getRecentEpisodes( + session.id, + RECENT_EPISODE_LIMIT + ); + + // 3. Assemble prompt + const prompt = buildPrompt(recentEpisodes, userMessage); + + // 4. Run inference + const result = await inference.complete(prompt, options); + + // 5. Write episode back to memory + memory.createEpisode( + session.id, + userMessage, + result.text, + (result.evalCount || 0) + (result.promptEvalCount || 0 ) + ).catch(err => console.error(`[orchestration] Failed to save episode`, err.message)); + + // 6. Return response + return { + sessionId: externalId, + response: result.text, + model: result.model, + tokenCount: (result.evalCount || 0 ) + (result.promptEvalCount || 0 ), + }; +} + +module.exports = { chat }; \ No newline at end of file diff --git a/packages/orchestration-service/src/index.js b/packages/orchestration-service/src/index.js index d62108b..41e7386 100644 --- a/packages/orchestration-service/src/index.js +++ b/packages/orchestration-service/src/index.js @@ -1,27 +1,27 @@ require ('dotenv').config(); const express = require('express'); const {getEnv} = require('@nexusai/shared'); +const chatRouter = require('./routes/chat'); const app = express(); app.use(express.json()); -const PORT = getEnv('PORT', '3000'); // Default to 3000 if PORT is not set - -//Service URLS - loaded from .env on each node -const MEMORY_URL = getEnv('MEMORY_SERVICE_URL', 'http://localhost:3002'); -const INFERENCE_URL = getEnv('INFERENCE_SERVICE_URL', 'http://localhost:3001'); -const EMBEDDING_URL = getEnv('EMBEDDING_SERVICE_URL', 'http://localhost:3003'); +const PORT = getEnv('PORT', '4000'); // Default to 4000 if PORT is not set // Health check endpoint app.get('/health', (req, res) => { res.json({ service: 'Orchestration Service', status: 'healthy', - connections: {MEMORY_URL, INFERENCE_URL, EMBEDDING_URL} + memoryService: getEnv('MEMORY_SERVICE_URL', 'http://localhost:3002'), + embeddingService: getEnv('EMBEDDING_SERVICE_URL', 'http://localhost:3003'), + inferenceService: getEnv('INFERENCE_SERVICE_URL', 'http://localhost:3001'), }); }); -// Start the server +app.use('/chat', chatRouter); + +/******* Start the server ************/ app.listen(PORT, () => { console.log(`Orchestration Service is running on port ${PORT}`); }); \ No newline at end of file diff --git a/packages/orchestration-service/src/routes/chat.js b/packages/orchestration-service/src/routes/chat.js new file mode 100644 index 0000000..822fdd4 --- /dev/null +++ b/packages/orchestration-service/src/routes/chat.js @@ -0,0 +1,24 @@ +const { Router } = require('express') +const { chat } = require('../chat/index'); + +const router = Router(); + +router.post('/chat', async (req, res) => { + const { sessionId, message } = req.body; + + if (!sessionId) return res.status(400).json({ error: 'sessionId is required'}); + if (!message) return res.status(400).json({ error: 'message is required'}); + + try { + const result = await chat(sessionId, message, { + model: req.body.model, + temperature: req.body.temperature, + }); + res.json(result) + } catch (err) { + console.error(`[orchestration] chat error: `, err.message) + res.status(500).json ({ error: err.message}) + } +}); + +module.exports = router; \ No newline at end of file diff --git a/packages/orchestration-service/src/services/inference.js b/packages/orchestration-service/src/services/inference.js new file mode 100644 index 0000000..ccb4f44 --- /dev/null +++ b/packages/orchestration-service/src/services/inference.js @@ -0,0 +1,18 @@ +const fetch = require('node-fetch'); +const { getEnv } = require('@nexusai/shared'); + +const BASE_URL = getEnv('INFERENCE_SERVICE_URL', 'http://localhost:3001'); + +async function complete(prompt, options ={}) { + const res = await fetch(`${BASE_URL}/complete`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ prompt, ...options }) + }) + if (!res.ok) throw new Error(`Inference service error: ${res.status} ${res.statusText}`); + return res.json(); +} + +module.exports = { + complete +} \ No newline at end of file diff --git a/packages/orchestration-service/src/services/memory.js b/packages/orchestration-service/src/services/memory.js new file mode 100644 index 0000000..b597297 --- /dev/null +++ b/packages/orchestration-service/src/services/memory.js @@ -0,0 +1,48 @@ +const fetch = require('node-fetch'); +const { getEnv } = require('@nexusai/shared'); + +const BASE_URL = getEnv('MEMORY_SERVICE_URL', 'http://localhost:3002'); + +//function to get session by external id, returns null if not found, throws error for other issues +async function getSessionByExternalId(externalId) { + const res = await fetch(`${BASE_URL}/sessions/by-external/${externalId}`); + + if (!res.status === 400) return null; // Not found or bad request + if (!res.ok) throw new Error(`Memory service error: ${res.status} ${res.statusText}`); // Other errors + + return res.json(); +} + +// create a new session with an external ID, returns the created session +async function createSession(externalId) { + const res = await fetch(`${BASE_URL}/sessions`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ externalId }) + }); + if (!res.ok) throw new Error(`Failed to create sessions: ${res.status} ${res.statusText}`); + return res.json(); +} + +async function getRecentEpisodes(sessionId, limit = 10) { + const res = await fetch(`${BASE_URL}/sessions/${sessionId}/episodes/recent?limit=${limit}`); + if (!res.ok) throw new Error(`Failed to fetch episodes: ${res.status} ${res.statusText}`); + return res.json(); +} + +async function createEpisode(sessionId, userMessage, aiResponse, tokenCount) { + const res = await fetch(`${BASE_URL}/episodes`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ sessionId, userMessage, aiResponse, tokenCount }) + }); + if (!res.ok) throw new Error(`Failed to create episode: ${res.status} ${res.statusText}`); + return res.json(); +} + +module.exports = { + getSessionByExternalId, + createSession, + getRecentEpisodes, + createEpisode +} \ No newline at end of file