'use strict'; const express = require('express'); const { WebSocketServer } = require('ws'); const fs = require('fs'); const path = require('path'); const readline = require('readline'); const http = require('http'); const { URL } = require('url'); const PORT = parseInt(process.env.PORT || '3850', 10); const SESSIONS_DIR = process.env.SESSIONS_DIR || '/home/node/.openclaw/agents'; const app = express(); const server = http.createServer(app); const wss = new WebSocketServer({ noServer: true }); // --------------------------------------------------------------------------- // Security helpers // --------------------------------------------------------------------------- /** Prevent path traversal: only allow alphanumeric, dots, hyphens, underscores */ function isValidName(name) { return ( typeof name === 'string' && name.length > 0 && name.length <= 256 && /^[a-zA-Z0-9._-]+$/.test(name) && !name.includes('..') ); } // --------------------------------------------------------------------------- // File discovery // --------------------------------------------------------------------------- function getAgentDirs() { try { return fs.readdirSync(SESSIONS_DIR, { withFileTypes: true }) .filter(d => d.isDirectory()) .map(d => d.name); } catch { return []; } } /** * Returns all session file descriptors for an agent. * Handles both plain `uuid.jsonl` and archived `uuid.jsonl.deleted.*` / `uuid.jsonl.reset.*`. */ function getSessionFiles(agent) { if (!isValidName(agent)) return []; const sessionsPath = path.join(SESSIONS_DIR, agent, 'sessions'); try { const entries = fs.readdirSync(sessionsPath); return entries .filter(f => f.includes('.jsonl')) .map(f => { // The session ID is the part before the first `.jsonl` const jsonlIdx = f.indexOf('.jsonl'); const fileSessionId = f.slice(0, jsonlIdx); let status = null; if (f.includes('.jsonl.deleted')) status = 'deleted'; else if (f.includes('.jsonl.reset')) status = 'reset'; return { filename: f, filePath: path.join(sessionsPath, f), agent, fileSessionId, status, // null = active or idle (determined later by mtime) }; }); } catch { return []; } } /** Find the file path for a given (agent, sessionId) pair */ function findSessionFile(agent, sessionId) { if (!isValidName(agent) || !isValidName(sessionId)) return null; const files = getSessionFiles(agent); const match = files.find(f => f.fileSessionId === sessionId); return match ? match.filePath : null; } // --------------------------------------------------------------------------- // JSONL parsing helpers // --------------------------------------------------------------------------- /** Read all non-empty lines from a file, returns string[] */ function readLines(filePath) { return new Promise((resolve, reject) => { const lines = []; const stream = fs.createReadStream(filePath, { encoding: 'utf8' }); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); rl.on('line', line => { if (line.trim()) lines.push(line); }); rl.on('close', () => resolve(lines)); rl.on('error', reject); stream.on('error', reject); }); } /** * Parse a session file and return a summary object. * Skips malformed lines silently. */ async function parseSessionSummary(fileInfo) { const { filePath, fileSessionId, agent, status: fileStatus } = fileInfo; try { const lines = await readLines(filePath); let sessionId = null; let cwd = null; let model = null; let messageCount = 0; let userMessages = 0; let assistantMessages = 0; let totalTokens = 0; let totalCost = 0; let firstTimestamp = null; let lastTimestamp = null; for (const line of lines) { try { const event = JSON.parse(line); // Session start event if (event.type === 'session') { sessionId = event.id; cwd = event.cwd; if (!firstTimestamp && event.timestamp) firstTimestamp = event.timestamp; continue; } // Model snapshot (most reliable model source) if (event.type === 'custom' && event.customType === 'model-snapshot' && event.data?.modelId) { model = event.data.modelId; continue; } // Model change event if (event.type === 'model_change' && event.modelId) { if (!model) model = event.modelId; continue; } // Message events if (event.type === 'message') { const msg = event.message || {}; messageCount++; if (event.timestamp) { if (!firstTimestamp) firstTimestamp = event.timestamp; lastTimestamp = event.timestamp; } if (msg.usage) { totalTokens += msg.usage.totalTokens || 0; totalCost += msg.usage.cost?.total || 0; } // Extract model from assistant messages (skip proxy names) if (!model && msg.model && msg.model !== 'delivery-mirror') { model = msg.model; } if (msg.role === 'user') userMessages++; if (msg.role === 'assistant') assistantMessages++; } } catch { // Skip malformed line } } // Determine status: file suffix wins, otherwise check mtime let status = fileStatus; if (!status) { try { const stat = fs.statSync(filePath); const ageMs = Date.now() - stat.mtimeMs; status = ageMs < 5 * 60 * 1000 ? 'active' : 'idle'; } catch { status = 'idle'; } } return { agent, fileSessionId, sessionId: sessionId || fileSessionId, cwd, model, status, messageCount, userMessages, assistantMessages, totalTokens, totalCost, firstTimestamp, lastTimestamp, }; } catch { return null; } } /** Parse all events from a session file (for the detail view) */ async function parseAllEvents(filePath) { const lines = await readLines(filePath); const events = []; for (const line of lines) { try { events.push(JSON.parse(line)); } catch { // Skip malformed line } } return events; } // --------------------------------------------------------------------------- // API routes // --------------------------------------------------------------------------- /** GET /api/sessions — list all sessions across all agents */ app.get('/api/sessions', async (req, res) => { try { const agents = getAgentDirs(); const fileInfos = agents.flatMap(agent => getSessionFiles(agent)); // Parse all session files in parallel const results = await Promise.all(fileInfos.map(info => parseSessionSummary(info))); const sessions = results.filter(Boolean); // Sort by last activity desc (most recent first) sessions.sort((a, b) => { const at = a.lastTimestamp || a.firstTimestamp || ''; const bt = b.lastTimestamp || b.firstTimestamp || ''; return bt.localeCompare(at); }); res.json(sessions); } catch (err) { console.error('GET /api/sessions error:', err); res.status(500).json({ error: err.message }); } }); /** GET /api/sessions/:agent/:id — full event log for one session */ app.get('/api/sessions/:agent/:id', async (req, res) => { const { agent, id } = req.params; if (!isValidName(agent) || !isValidName(id)) { return res.status(400).json({ error: 'Invalid parameters' }); } const filePath = findSessionFile(agent, id); if (!filePath) { return res.status(404).json({ error: 'Session not found' }); } try { const events = await parseAllEvents(filePath); res.json(events); } catch (err) { console.error(`GET /api/sessions/${agent}/${id} error:`, err); res.status(500).json({ error: err.message }); } }); // --------------------------------------------------------------------------- // WebSocket — real-time tail of a session file // --------------------------------------------------------------------------- server.on('upgrade', (request, socket, head) => { try { const { pathname } = new URL(request.url, 'http://localhost'); if (pathname.startsWith('/ws/sessions/')) { wss.handleUpgrade(request, socket, head, ws => { wss.emit('connection', ws, request); }); } else { socket.destroy(); } } catch { socket.destroy(); } }); wss.on('connection', (ws, request) => { try { const { pathname } = new URL(request.url, 'http://localhost'); // Pattern: /ws/sessions/:agent/:id const match = pathname.match(/^\/ws\/sessions\/([^/]+)\/(.+)$/); if (!match) { ws.close(1008, 'Invalid path'); return; } const [, agent, sessionId] = match; const filePath = findSessionFile(agent, sessionId); if (!filePath) { ws.close(1008, 'Session not found'); return; } let lastPosition = 0; let remainder = ''; /** Read any new bytes since last read and emit parsed events */ const sendNewContent = () => { if (ws.readyState !== 1 /* OPEN */) return; try { const stat = fs.statSync(filePath); if (stat.size <= lastPosition) return; const chunkSize = stat.size - lastPosition; const buf = Buffer.alloc(chunkSize); const fd = fs.openSync(filePath, 'r'); fs.readSync(fd, buf, 0, chunkSize, lastPosition); fs.closeSync(fd); lastPosition = stat.size; remainder += buf.toString('utf8'); const lines = remainder.split('\n'); remainder = lines.pop() || ''; // keep incomplete last line for (const line of lines) { if (!line.trim()) continue; // Validate JSON before sending try { JSON.parse(line); // throws if invalid ws.send(line); } catch { // Skip malformed line } } } catch { // File read error (deleted, etc.) — ignore } }; // Send all existing content immediately sendNewContent(); // Watch for new writes let watcher = null; try { watcher = fs.watch(filePath, () => sendNewContent()); } catch { // fs.watch unavailable — fall back to 1s polling const interval = setInterval(sendNewContent, 1000); ws.on('close', () => clearInterval(interval)); return; } ws.on('close', () => { try { watcher?.close(); } catch { /* ignore */ } }); } catch (err) { console.error('WebSocket error:', err); try { ws.close(1011, 'Internal error'); } catch { /* ignore */ } } }); // --------------------------------------------------------------------------- // Static files + SPA fallback // --------------------------------------------------------------------------- app.use(express.static(path.join(__dirname, 'public'))); app.get('*', (_req, res) => { res.sendFile(path.join(__dirname, 'public', 'index.html')); }); // --------------------------------------------------------------------------- // Start // --------------------------------------------------------------------------- server.listen(PORT, () => { console.log(`OpenClaw Dashboard → http://localhost:${PORT}`); console.log(`Sessions directory → ${SESSIONS_DIR}`); });