Files
PromdataPanel/server/index.js
CN-JS-HuiBai bc8414df3d 优化
2026-04-06 17:49:31 +08:00

1036 lines
36 KiB
JavaScript

require('dotenv').config();
const express = require('express');
const cors = require('cors');
const path = require('path');
const db = require('./db');
const prometheusService = require('./prometheus-service');
const cache = require('./cache');
const geoService = require('./geo-service');
const latencyService = require('./latency-service');
const checkAndFixDatabase = require('./db-integrity-check');
const http = require('http');
const WebSocket = require('ws');
const app = express();
const PORT = process.env.PORT || 3000;
const HOST = process.env.HOST || '0.0.0.0';
app.use(cors());
app.use(express.json());
const fs = require('fs');
const crypto = require('crypto');
let isDbInitialized = false;
const sessions = new Map(); // Simple session store: sessionId -> {userId, username}
// Middleware: Check Auth
function requireAuth(req, res, next) {
const sessionId = getCookie(req, 'session_id');
if (sessionId && sessions.has(sessionId)) {
req.user = sessions.get(sessionId);
return next();
}
res.status(401).json({ error: 'Auth required' });
}
// Helper: Get Cookie
function getCookie(req, name) {
const matches = req.headers.cookie && req.headers.cookie.match(new RegExp('(?:^|; )' + name.replace(/([\.$?*|{}\(\)\[\]\\\/\+^])/g, '\\$1') + '=([^;]*)'));
return matches ? decodeURIComponent(matches[1]) : undefined;
}
async function checkDb() {
try {
const fs = require('fs');
if (!fs.existsSync(path.join(__dirname, '..', '.env'))) {
isDbInitialized = false;
return;
}
const [rows] = await db.query("SHOW TABLES LIKE 'prometheus_sources'");
isDbInitialized = rows.length > 0;
} catch (err) {
isDbInitialized = false;
}
}
checkDb();
// --- Health API ---
app.get('/health', async (req, res) => {
try {
const dbStatus = await db.checkHealth();
const cacheStatus = await cache.checkHealth();
const isAllOk = dbStatus.status === 'up' && cacheStatus.status === 'up';
const healthInfo = {
status: isAllOk ? 'ok' : 'error',
timestamp: new Date().toISOString(),
service: {
status: 'running',
uptime: Math.floor(process.uptime()),
memory_usage: {
rss: Math.floor(process.memoryUsage().rss / 1024 / 1024) + ' MB',
heapTotal: Math.floor(process.memoryUsage().heapTotal / 1024 / 1024) + ' MB'
},
node_version: process.version
},
checks: {
database: {
name: 'MySQL',
status: dbStatus.status,
message: dbStatus.error || 'Connected'
},
valkey: {
name: 'Valkey (Redis)',
status: cacheStatus.status,
message: cacheStatus.error || 'Connected'
}
}
};
if (isAllOk) {
res.json(healthInfo);
} else {
res.status(500).json(healthInfo);
}
} catch (err) {
res.status(500).json({ status: 'error', message: err.message });
}
});
// --- Auth API ---
app.post('/api/auth/login', async (req, res) => {
const { username, password } = req.body;
try {
const [rows] = await db.query('SELECT * FROM users WHERE username = ?', [username]);
if (rows.length === 0) return res.status(401).json({ error: 'Invalid credentials' });
const user = rows[0];
const hash = crypto.pbkdf2Sync(password, user.salt, 1000, 64, 'sha512').toString('hex');
if (hash === user.password) {
const sessionId = crypto.randomBytes(32).toString('hex');
sessions.set(sessionId, { id: user.id, username: user.username });
res.setHeader('Set-Cookie', `session_id=${sessionId}; Path=/; HttpOnly; SameSite=Strict; Max-Age=86400`);
res.json({ success: true, username: user.username });
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
} catch (err) {
res.status(500).json({ error: 'Login failed' });
}
});
app.post('/api/auth/logout', (req, res) => {
const sessionId = getCookie(req, 'session_id');
if (sessionId) sessions.delete(sessionId);
res.setHeader('Set-Cookie', 'session_id=; Path=/; HttpOnly; Max-Age=0');
res.json({ success: true });
});
app.post('/api/auth/change-password', requireAuth, async (req, res) => {
const { oldPassword, newPassword } = req.body;
if (!oldPassword || !newPassword) {
return res.status(400).json({ error: '需要输入旧密码和新密码' });
}
try {
const [rows] = await db.query('SELECT * FROM users WHERE id = ?', [req.user.id]);
if (rows.length === 0) return res.status(404).json({ error: '用户不存在' });
const user = rows[0];
const oldHash = crypto.pbkdf2Sync(oldPassword, user.salt, 1000, 64, 'sha512').toString('hex');
if (oldHash !== user.password) {
return res.status(401).json({ error: '旧密码输入错误' });
}
const newSalt = crypto.randomBytes(16).toString('hex');
const newHash = crypto.pbkdf2Sync(newPassword, newSalt, 1000, 64, 'sha512').toString('hex');
await db.query('UPDATE users SET password = ?, salt = ? WHERE id = ?', [newHash, newSalt, user.id]);
res.json({ success: true, message: '密码修改成功' });
} catch (err) {
console.error('Password update error:', err);
res.status(500).json({ error: '服务器错误,修改失败' });
}
});
app.get('/api/auth/status', (req, res) => {
const sessionId = getCookie(req, 'session_id');
if (sessionId && sessions.has(sessionId)) {
res.json({ authenticated: true, username: sessions.get(sessionId).username });
} else {
res.json({ authenticated: false });
}
});
// Setup API Routes
app.post('/api/setup/test', async (req, res) => {
const { host, port, user, password } = req.body;
try {
const mysql = require('mysql2/promise');
const connection = await mysql.createConnection({
host: host || 'localhost',
port: parseInt(port) || 3306,
user: user || 'root',
password: password || ''
});
await connection.ping();
await connection.end();
res.json({ success: true, message: 'Connection successful' });
} catch (err) {
res.status(400).json({ success: false, error: err.message });
}
});
app.post('/api/setup/test-valkey', async (req, res) => {
const { host, port, password } = req.body;
try {
const Redis = require('ioredis');
const redis = new Redis({
host: host || 'localhost',
port: parseInt(port) || 6379,
password: password || undefined,
lazyConnect: true,
maxRetriesPerRequest: 1,
connectTimeout: 5000
});
await redis.connect();
await redis.ping();
await redis.disconnect();
res.json({ success: true, message: 'Valkey connection successful' });
} catch (err) {
res.status(400).json({ success: false, error: err.message });
}
});
app.post('/api/setup/init', async (req, res) => {
const { host, port, user, password, database, vHost, vPort, vPassword } = req.body;
try {
const mysql = require('mysql2/promise');
const connection = await mysql.createConnection({
host: host || 'localhost',
port: parseInt(port) || 3306,
user: user || 'root',
password: password || ''
});
const dbName = database || 'display_wall';
// Create database
await connection.query(`CREATE DATABASE IF NOT EXISTS \`${dbName}\` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci`);
await connection.query(`USE \`${dbName}\``);
// Create tables
await connection.query(`
CREATE TABLE IF NOT EXISTS prometheus_sources (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
url VARCHAR(500) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL,
salt VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS traffic_stats (
id INT AUTO_INCREMENT PRIMARY KEY,
rx_bytes BIGINT UNSIGNED DEFAULT 0,
tx_bytes BIGINT UNSIGNED DEFAULT 0,
rx_bandwidth DOUBLE DEFAULT 0,
tx_bandwidth DOUBLE DEFAULT 0,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE INDEX (timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS site_settings (
id INT PRIMARY KEY DEFAULT 1,
page_name VARCHAR(255) DEFAULT '数据可视化展示大屏',
title VARCHAR(255) DEFAULT '数据可视化展示大屏',
logo_url TEXT,
logo_url_dark TEXT,
favicon_url TEXT,
default_theme VARCHAR(20) DEFAULT 'dark',
show_95_bandwidth TINYINT(1) DEFAULT 0,
p95_type VARCHAR(20) DEFAULT 'tx',
blackbox_source_id INT,
latency_source VARCHAR(100),
latency_dest VARCHAR(100),
latency_target VARCHAR(255),
icp_filing VARCHAR(255),
ps_filing VARCHAR(255),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
`);
await connection.query(`
INSERT IGNORE INTO site_settings (id, page_name, title, default_theme, show_95_bandwidth, p95_type)
VALUES (1, '数据可视化展示大屏', '数据可视化展示大屏', 'dark', 0, 'tx')
`);
await connection.end();
// Save to .env
const envContent = `MYSQL_HOST=${host || 'localhost'}
MYSQL_PORT=${port || '3306'}
MYSQL_USER=${user || 'root'}
MYSQL_PASSWORD=${password || ''}
MYSQL_DATABASE=${dbName}
VALKEY_HOST=${vHost || 'localhost'}
VALKEY_PORT=${vPort || '6379'}
VALKEY_PASSWORD=${vPassword || ''}
PORT=${process.env.PORT || 3000}
HOST=${process.env.HOST || '0.0.0.0'}
REFRESH_INTERVAL=${process.env.REFRESH_INTERVAL || 5000}
`;
fs.writeFileSync(path.join(__dirname, '..', '.env'), envContent);
// Update process.env
process.env.MYSQL_HOST = host;
process.env.MYSQL_PORT = port;
process.env.MYSQL_USER = user;
process.env.MYSQL_PASSWORD = password;
process.env.MYSQL_DATABASE = dbName;
process.env.VALKEY_HOST = vHost;
process.env.VALKEY_PORT = vPort;
process.env.VALKEY_PASSWORD = vPassword;
// Re-initialize pools
db.initPool();
cache.init();
isDbInitialized = true;
res.json({ success: true, message: 'Initialization complete' });
} catch (err) {
console.error('Initialization error:', err);
res.status(500).json({ success: false, error: err.message });
}
});
// Setup Status Check
app.get('/api/setup/status', async (req, res) => {
try {
if (!isDbInitialized) {
return res.json({ initialized: false, step: 'db' });
}
const [rows] = await db.query('SELECT COUNT(*) as count FROM users');
if (rows[0].count === 0) {
return res.json({ initialized: true, needsAdmin: true, step: 'admin' });
}
res.json({ initialized: true, needsAdmin: false, step: 'prom' });
} catch (err) {
console.error('Status check error:', err);
res.json({ initialized: false, step: 'db' });
}
});
// Create First Admin
app.post('/api/setup/admin', async (req, res) => {
const { username, password } = req.body;
if (!username || !password) return res.status(400).json({ error: 'Username and password are required' });
try {
const [rows] = await db.query('SELECT COUNT(*) as count FROM users');
if (rows[0].count > 0) return res.status(403).json({ error: 'Admin already exists' });
const salt = crypto.randomBytes(16).toString('hex');
const hash = crypto.pbkdf2Sync(password, salt, 1000, 64, 'sha512').toString('hex');
await db.query('INSERT INTO users (username, password, salt) VALUES (?, ?, ?)', [username, hash, salt]);
const [userRows] = await db.query('SELECT id, username FROM users WHERE username = ?', [username]);
const user = userRows[0];
// Auto-login after creation so the next setup steps (like adding Prometheus) work without 401
const sessionId = crypto.randomBytes(32).toString('hex');
sessions.set(sessionId, { id: user.id, username: user.username });
res.setHeader('Set-Cookie', `session_id=${sessionId}; Path=/; HttpOnly; SameSite=Strict; Max-Age=86400`);
res.json({ success: true, message: 'Admin account created and logged in' });
} catch (err) {
console.error('Admin creation error:', err);
res.status(500).json({ error: err.message });
}
});
// Middleware to protect routes & enforce setup
app.use(async (req, res, next) => {
// Allow system files and setup APIs
if (req.path === '/health' || req.path.startsWith('/api/setup') || req.path === '/init.html' || req.path.startsWith('/css/') || req.path.startsWith('/js/') || req.path.startsWith('/fonts/')) {
return next();
}
// Enforce DB setup
if (!isDbInitialized) {
if (req.path.startsWith('/api/')) {
return res.status(503).json({ error: 'Database not initialized', needSetup: true });
}
return res.redirect('/init.html');
}
// Enforce User setup
try {
const [rows] = await db.query('SELECT COUNT(*) as count FROM users');
if (rows[0].count === 0) {
if (req.path.startsWith('/api/')) {
return res.status(503).json({ error: 'Admin not configured', needAdmin: true });
}
return res.redirect('/init.html?step=admin');
}
} catch (err) {
// If table doesn't exist, it's a DB initialization issue
}
if (req.path === '/init.html') {
return res.redirect('/');
}
next();
});
// Helper to serve index.html with injected settings
const serveIndex = async (req, res) => {
try {
const indexPath = path.join(__dirname, '..', 'public', 'index.html');
if (!fs.existsSync(indexPath)) return res.status(404).send('Not found');
let html = fs.readFileSync(indexPath, 'utf8');
// Fetch settings
let settings = {
page_name: '数据可视化展示大屏',
show_page_name: 1,
title: '数据可视化展示大屏',
logo_url: null,
logo_url_dark: null,
favicon_url: null,
default_theme: 'dark',
blackbox_source_id: null,
latency_source: null,
latency_dest: null,
latency_target: null,
icp_filing: null,
ps_filing: null
};
if (isDbInitialized) {
try {
const [rows] = await db.query('SELECT * FROM site_settings WHERE id = 1');
if (rows.length > 0) settings = rows[0];
} catch (e) {
// DB not ready or table missing
}
}
// Inject settings
const settingsJson = JSON.stringify(settings);
const injection = `<script>window.SITE_SETTINGS = ${settingsJson};</script>`;
// Replace <head> with <head> + injection
html = html.replace('<head>', '<head>' + injection);
res.send(html);
} catch (err) {
console.error('Error serving index:', err);
res.status(500).send('Internal Server Error');
}
};
app.get('/', serveIndex);
app.get('/index.html', serveIndex);
app.use(express.static(path.join(__dirname, '..', 'public'), { index: false }));
// ==================== Prometheus Source CRUD ====================
// Get all Prometheus sources
app.get('/api/sources', async (req, res) => {
try {
const [rows] = await db.query('SELECT * FROM prometheus_sources ORDER BY is_server_source DESC, created_at DESC');
// Test connectivity for each source
const sourcesWithStatus = await Promise.all(rows.map(async (source) => {
try {
let response;
if (source.type === 'blackbox') {
// Simple check for blackbox exporter
const res = await fetch(`${source.url.replace(/\/+$/, '')}/metrics`, { timeout: 3000 }).catch(() => null);
response = (res && res.ok) ? 'Blackbox Exporter Ready' : 'Connection Error';
} else {
response = await prometheusService.testConnection(source.url);
}
return { ...source, status: 'online', version: response };
} catch (e) {
return { ...source, status: 'offline', version: null };
}
}));
res.json(sourcesWithStatus);
} catch (err) {
console.error('Error fetching sources:', err);
res.status(500).json({ error: 'Failed to fetch sources' });
}
});
// Add a new Prometheus source
app.post('/api/sources', requireAuth, async (req, res) => {
let { name, url, description, is_server_source, type } = req.body;
if (!name || !url) {
return res.status(400).json({ error: 'Name and URL are required' });
}
if (!/^https?:\/\//i.test(url)) url = 'http://' + url;
try {
const [result] = await db.query(
'INSERT INTO prometheus_sources (name, url, description, is_server_source, type) VALUES (?, ?, ?, ?, ?)',
[name, url, description || '', is_server_source === undefined ? 1 : (is_server_source ? 1 : 0), type || 'prometheus']
);
const [rows] = await db.query('SELECT * FROM prometheus_sources WHERE id = ?', [result.insertId]);
// Clear network history cache to force refresh
await cache.del('network_history_all');
res.status(201).json(rows[0]);
} catch (err) {
console.error('Error adding source:', err);
res.status(500).json({ error: 'Failed to add source' });
}
});
// Update a Prometheus source
app.put('/api/sources/:id', requireAuth, async (req, res) => {
let { name, url, description, is_server_source } = req.body;
if (url && !/^https?:\/\//i.test(url)) url = 'http://' + url;
try {
await db.query(
'UPDATE prometheus_sources SET name = ?, url = ?, description = ?, is_server_source = ?, type = ? WHERE id = ?',
[name, url, description || '', is_server_source ? 1 : 0, type || 'prometheus', req.params.id]
);
// Clear network history cache
await cache.del('network_history_all');
const [rows] = await db.query('SELECT * FROM prometheus_sources WHERE id = ?', [req.params.id]);
res.json(rows[0]);
} catch (err) {
console.error('Error updating source:', err);
res.status(500).json({ error: 'Failed to update source' });
}
});
// Delete a Prometheus source
app.delete('/api/sources/:id', requireAuth, async (req, res) => {
try {
await db.query('DELETE FROM prometheus_sources WHERE id = ?', [req.params.id]);
// Clear network history cache
await cache.del('network_history_all');
res.json({ message: 'Source deleted' });
} catch (err) {
console.error('Error deleting source:', err);
res.status(500).json({ error: 'Failed to delete source' });
}
});
// Test connection to a Prometheus source
app.post('/api/sources/test', async (req, res) => {
let { url, type } = req.body;
if (url && !/^https?:\/\//i.test(url)) url = 'http://' + url;
try {
let result;
if (type === 'blackbox') {
const resVal = await fetch(`${url.replace(/\/+$/, '')}/metrics`, { timeout: 5000 }).catch(() => null);
result = (resVal && resVal.ok) ? 'Blackbox Exporter Ready' : 'Connection Failed';
if (!resVal || !resVal.ok) throw new Error(result);
} else {
result = await prometheusService.testConnection(url);
}
res.json({ status: 'ok', version: result });
} catch (err) {
res.status(400).json({ status: 'error', message: err.message });
}
});
// ==================== Site Settings ====================
// Get site settings
app.get('/api/settings', async (req, res) => {
try {
const [rows] = await db.query('SELECT * FROM site_settings WHERE id = 1');
if (rows.length === 0) {
return res.json({
page_name: '数据可视化展示大屏',
show_page_name: 1,
title: '数据可视化展示大屏',
logo_url: null,
logo_url_dark: null,
favicon_url: null,
show_95_bandwidth: 0,
p95_type: 'tx',
blackbox_source_id: null,
latency_source: null,
latency_dest: null,
latency_target: null,
icp_filing: null,
ps_filing: null
});
}
res.json(rows[0]);
} catch (err) {
console.error('Error fetching settings:', err);
res.status(500).json({ error: 'Failed to fetch settings' });
}
});
// Update site settings
app.post('/api/settings', requireAuth, async (req, res) => {
try {
// 1. Fetch current settings first to preserve fields not sent by the UI
const [rows] = await db.query('SELECT * FROM site_settings WHERE id = 1');
let current = rows.length > 0 ? rows[0] : {};
// 2. Destructure fields from body
const {
page_name, show_page_name, title, logo_url, logo_url_dark, favicon_url,
default_theme, show_95_bandwidth, p95_type,
icp_filing, ps_filing
} = req.body;
// 3. Prepare parameters, prioritizing body but falling back to current
const settings = {
page_name: page_name !== undefined ? page_name : (current.page_name || '数据可视化展示大屏'),
show_page_name: show_page_name !== undefined ? (show_page_name ? 1 : 0) : (current.show_page_name !== undefined ? current.show_page_name : 1),
title: title !== undefined ? title : (current.title || '数据可视化展示大屏'),
logo_url: logo_url !== undefined ? logo_url : (current.logo_url || null),
logo_url_dark: logo_url_dark !== undefined ? logo_url_dark : (current.logo_url_dark || null),
favicon_url: favicon_url !== undefined ? favicon_url : (current.favicon_url || null),
default_theme: default_theme !== undefined ? default_theme : (current.default_theme || 'dark'),
show_95_bandwidth: show_95_bandwidth !== undefined ? (show_95_bandwidth ? 1 : 0) : (current.show_95_bandwidth || 0),
p95_type: p95_type !== undefined ? p95_type : (current.p95_type || 'tx'),
blackbox_source_id: current.blackbox_source_id || null, // UI doesn't send this
latency_source: current.latency_source || null, // UI doesn't send this
latency_dest: current.latency_dest || null, // UI doesn't send this
latency_target: current.latency_target || null, // UI doesn't send this
icp_filing: icp_filing !== undefined ? icp_filing : (current.icp_filing || null),
ps_filing: ps_filing !== undefined ? ps_filing : (current.ps_filing || null)
};
// 4. Update database
await db.query(
`INSERT INTO site_settings (
id, page_name, show_page_name, title, logo_url, logo_url_dark, favicon_url,
default_theme, show_95_bandwidth, p95_type,
blackbox_source_id, latency_source, latency_dest, latency_target,
icp_filing, ps_filing
) VALUES (1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
page_name = VALUES(page_name),
show_page_name = VALUES(show_page_name),
title = VALUES(title),
logo_url = VALUES(logo_url),
logo_url_dark = VALUES(logo_url_dark),
favicon_url = VALUES(favicon_url),
default_theme = VALUES(default_theme),
show_95_bandwidth = VALUES(show_95_bandwidth),
p95_type = VALUES(p95_type),
blackbox_source_id = VALUES(blackbox_source_id),
latency_source = VALUES(latency_source),
latency_dest = VALUES(latency_dest),
latency_target = VALUES(latency_target),
icp_filing = VALUES(icp_filing),
ps_filing = VALUES(ps_filing)`,
[
settings.page_name, settings.show_page_name, settings.title, settings.logo_url, settings.logo_url_dark, settings.favicon_url,
settings.default_theme, settings.show_95_bandwidth, settings.p95_type,
settings.blackbox_source_id, settings.latency_source, settings.latency_dest, settings.latency_target,
settings.icp_filing, settings.ps_filing
]
);
res.json({ success: true });
} catch (err) {
console.error('Error updating settings:', err);
res.status(500).json({ error: 'Failed to update settings' });
}
});
// ==================== Metrics Aggregation ====================
// Reusable function to get overview metrics
async function getOverview() {
const [sources] = await db.query('SELECT * FROM prometheus_sources WHERE is_server_source = 1 AND type != "blackbox"');
if (sources.length === 0) {
return {
totalServers: 0,
activeServers: 0,
cpu: { used: 0, total: 0, percent: 0 },
memory: { used: 0, total: 0, percent: 0 },
disk: { used: 0, total: 0, percent: 0 },
network: { total: 0, rx: 0, tx: 0 },
traffic24h: { rx: 0, tx: 0, total: 0 },
servers: []
};
}
const allMetrics = await Promise.all(sources.map(async (source) => {
const cacheKey = `source_metrics:${source.url}:${source.name}`;
const cached = await cache.get(cacheKey);
if (cached) return cached;
try {
const metrics = await prometheusService.getOverviewMetrics(source.url, source.name);
// Don't set cache here if we want real-time WS push to be fresh,
// but keeping it for REST API performance is fine.
await cache.set(cacheKey, metrics, 15); // Cache for 15s
return metrics;
} catch (err) {
console.error(`Error fetching metrics from ${source.name}:`, err.message);
return null;
}
}));
const validMetrics = allMetrics.filter(m => m !== null);
// Aggregate across all sources
let totalServers = 0;
let activeServers = 0;
let cpuUsed = 0, cpuTotal = 0;
let memUsed = 0, memTotal = 0;
let diskUsed = 0, diskTotal = 0;
let netRx = 0, netTx = 0;
let traffic24hRx = 0, traffic24hTx = 0;
let allServers = [];
for (const m of validMetrics) {
totalServers += m.totalServers;
activeServers += (m.activeServers !== undefined ? m.activeServers : m.totalServers);
cpuUsed += m.cpu.used;
cpuTotal += m.cpu.total;
memUsed += m.memory.used;
memTotal += m.memory.total;
diskUsed += m.disk.used;
diskTotal += m.disk.total;
netRx += m.network.rx;
netTx += m.network.tx;
traffic24hRx += m.traffic24h.rx;
traffic24hTx += m.traffic24h.tx;
allServers = allServers.concat(m.servers);
}
const overview = {
totalServers,
activeServers,
cpu: {
used: cpuUsed,
total: cpuTotal,
percent: cpuTotal > 0 ? (cpuUsed / cpuTotal * 100) : 0
},
memory: {
used: memUsed,
total: memTotal,
percent: memTotal > 0 ? (memUsed / memTotal * 100) : 0
},
disk: {
used: diskUsed,
total: diskTotal,
percent: diskTotal > 0 ? (diskUsed / diskTotal * 100) : 0
},
network: {
total: netRx + netTx,
rx: netRx,
tx: netTx
},
traffic24h: {
rx: traffic24hRx,
tx: traffic24hTx,
total: traffic24hRx + traffic24hTx
},
servers: allServers
};
// --- Add Geo Information to Servers ---
const geoServers = await Promise.all(overview.servers.map(async (server) => {
const realInstance = server.originalInstance || prometheusService.resolveToken(server.instance);
const cleanIp = realInstance.split(':')[0];
let geoData = null;
try {
const [rows] = await db.query('SELECT * FROM server_locations WHERE ip = ?', [cleanIp]);
if (rows.length > 0) {
geoData = rows[0];
} else {
geoService.getLocation(cleanIp).catch(() => {});
}
} catch (e) {}
const { originalInstance, ...safeServer } = server;
if (geoData) {
return {
...safeServer,
country: geoData.country,
countryName: geoData.country_name,
city: geoData.city,
lat: geoData.latitude,
lng: geoData.longitude
};
}
return safeServer;
}));
overview.servers = geoServers;
return overview;
}
// Get all aggregated metrics from all Prometheus sources
app.get('/api/metrics/overview', async (req, res) => {
try {
const overview = await getOverview();
res.json(overview);
} catch (err) {
console.error('Error fetching overview metrics:', err);
res.status(500).json({ error: 'Failed to fetch metrics' });
}
});
// Get network traffic history (past 24h) from Prometheus
app.get('/api/metrics/network-history', async (req, res) => {
try {
const cacheKey = 'network_history_all';
const cached = await cache.get(cacheKey);
if (cached) return res.json(cached);
const [sources] = await db.query('SELECT * FROM prometheus_sources WHERE is_server_source = 1 AND type != "blackbox"');
if (sources.length === 0) {
return res.json({ timestamps: [], rx: [], tx: [] });
}
const histories = await Promise.all(sources.map(source =>
prometheusService.getNetworkHistory(source.url).catch(err => {
console.error(`Error fetching network history from ${source.name}:`, err.message);
return null;
})
));
const validHistories = histories.filter(h => h !== null);
if (validHistories.length === 0) {
return res.json({ timestamps: [], rx: [], tx: [] });
}
const merged = prometheusService.mergeNetworkHistories(validHistories);
await cache.set(cacheKey, merged, 300); // Cache for 5 minutes
res.json(merged);
} catch (err) {
console.error('Error fetching network history history:', err);
res.status(500).json({ error: 'Failed to fetch network history history' });
}
});
// Get CPU usage history for sparklines
app.get('/api/metrics/cpu-history', async (req, res) => {
try {
const [sources] = await db.query('SELECT * FROM prometheus_sources WHERE is_server_source = 1 AND type != "blackbox"');
if (sources.length === 0) {
return res.json({ timestamps: [], values: [] });
}
const allHistories = await Promise.all(sources.map(source =>
prometheusService.getCpuHistory(source.url).catch(err => {
console.error(`Error fetching CPU history from ${source.name}:`, err.message);
return null;
})
));
const validHistories = allHistories.filter(h => h !== null);
if (validHistories.length === 0) {
return res.json({ timestamps: [], values: [] });
}
const merged = prometheusService.mergeCpuHistories(validHistories);
res.json(merged);
} catch (err) {
console.error('Error fetching CPU history:', err);
res.status(500).json({ error: 'Failed to fetch CPU history' });
}
});
// Get detailed metrics for a specific server
app.get('/api/metrics/server-details', async (req, res) => {
const { instance, job, source } = req.query;
if (!instance || !job || !source) {
return res.status(400).json({ error: 'instance, job, and source name are required' });
}
try {
// Find the source URL by name
const [rows] = await db.query('SELECT url FROM prometheus_sources WHERE name = ?', [source]);
if (rows.length === 0) {
return res.status(404).json({ error: 'Prometheus source not found' });
}
const sourceUrl = rows[0].url;
// Fetch detailed metrics
const details = await prometheusService.getServerDetails(sourceUrl, instance, job);
res.json(details);
} catch (err) {
console.error(`Error fetching server details for ${instance}:`, err.message);
res.status(500).json({ error: 'Failed to fetch server details' });
}
});
// Get historical metrics for a specific server
app.get('/api/metrics/server-history', async (req, res) => {
const { instance, job, source, metric, range, start, end } = req.query;
if (!instance || !job || !source || !metric) {
return res.status(400).json({ error: 'instance, job, source, and metric are required' });
}
try {
const [rows] = await db.query('SELECT url FROM prometheus_sources WHERE name = ?', [source]);
if (rows.length === 0) return res.status(404).json({ error: 'Source not found' });
const sourceUrl = rows[0].url;
const data = await prometheusService.getServerHistory(sourceUrl, instance, job, metric, range, start, end);
res.json(data);
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// SPA fallback
app.get('*', (req, res, next) => {
if (req.path.startsWith('/api/') || req.path.includes('.')) return next();
serveIndex(req, res);
});
// ==================== Latency Routes CRUD ====================
app.get('/api/latency-routes', async (req, res) => {
try {
const [rows] = await db.query(`
SELECT r.*, s.name as source_name
FROM latency_routes r
LEFT JOIN prometheus_sources s ON r.source_id = s.id
ORDER BY r.created_at DESC
`);
res.json(rows);
} catch (err) {
res.status(500).json({ error: 'Failed to fetch latency routes' });
}
});
app.post('/api/latency-routes', requireAuth, async (req, res) => {
const { source_id, latency_source, latency_dest, latency_target } = req.body;
try {
await db.query('INSERT INTO latency_routes (source_id, latency_source, latency_dest, latency_target) VALUES (?, ?, ?, ?)', [source_id, latency_source, latency_dest, latency_target]);
res.json({ success: true });
} catch (err) {
res.status(500).json({ error: 'Failed to add latency route' });
}
});
app.delete('/api/latency-routes/:id', requireAuth, async (req, res) => {
try {
await db.query('DELETE FROM latency_routes WHERE id = ?', [req.params.id]);
res.json({ success: true });
} catch (err) {
res.status(500).json({ error: 'Failed to delete latency route' });
}
});
app.put('/api/latency-routes/:id', requireAuth, async (req, res) => {
const { source_id, latency_source, latency_dest, latency_target } = req.body;
try {
await db.query(
'UPDATE latency_routes SET source_id = ?, latency_source = ?, latency_dest = ?, latency_target = ? WHERE id = ?',
[source_id, latency_source, latency_dest, latency_target, req.params.id]
);
res.json({ success: true });
} catch (err) {
res.status(500).json({ error: 'Failed to update latency route' });
}
});
// ==================== Metrics Latency ====================
app.get('/api/metrics/latency', async (req, res) => {
try {
const [routes] = await db.query(`
SELECT r.*, s.url, s.type as source_type
FROM latency_routes r
JOIN prometheus_sources s ON r.source_id = s.id
`);
if (routes.length === 0) {
return res.json({ routes: [] });
}
const results = await Promise.all(routes.map(async (route) => {
// Try to get from Valkey first (filled by background latencyService)
let latency = await cache.get(`latency:route:${route.id}`);
// Fallback if not in cache (only for prometheus sources, blackbox sources rely on the background service)
if (latency === null && route.source_type === 'prometheus') {
latency = await prometheusService.getLatency(route.url, route.latency_target);
}
return {
id: route.id,
source: route.latency_source,
dest: route.latency_dest,
latency: latency
};
}));
res.json({ routes: results });
} catch (err) {
console.error('Error fetching latencies:', err);
res.status(500).json({ error: 'Failed to fetch latency' });
}
});
// ==================== WebSocket Server ====================
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
function broadcast(data) {
const message = JSON.stringify(data);
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
// Broadcast loop
async function broadcastMetrics() {
try {
const overview = await getOverview();
broadcast({ type: 'overview', data: overview });
} catch (err) {
// console.error('WS Broadcast error:', err.message);
}
}
// Start services
checkAndFixDatabase();
latencyService.start();
const REFRESH_INT = parseInt(process.env.REFRESH_INTERVAL) || 5000;
setInterval(broadcastMetrics, REFRESH_INT);
server.listen(PORT, HOST, () => {
console.log(`\n 🚀 Data Visualization Display Wall (WebSocket Enabled)`);
console.log(` 📊 Server running at http://${HOST === '0.0.0.0' ? 'localhost' : HOST}:${PORT}`);
console.log(` ⚙️ Configure Prometheus sources at http://${HOST === '0.0.0.0' ? 'localhost' : HOST}:${PORT}/settings\n`);
});