进一步优化算法 引入Websocket
This commit is contained in:
286
server/index.js
286
server/index.js
@@ -7,6 +7,8 @@ const prometheusService = require('./prometheus-service');
|
||||
const cache = require('./cache');
|
||||
const geoService = require('./geo-service');
|
||||
const checkAndFixDatabase = require('./db-integrity-check');
|
||||
const http = require('http');
|
||||
const WebSocket = require('ws');
|
||||
|
||||
const app = express();
|
||||
const PORT = process.env.PORT || 3000;
|
||||
@@ -570,136 +572,135 @@ app.post('/api/settings', requireAuth, async (req, res) => {
|
||||
|
||||
// ==================== Metrics Aggregation ====================
|
||||
|
||||
// Reusable function to get overview metrics
|
||||
async function getOverview() {
|
||||
const [sources] = await db.query('SELECT * FROM prometheus_sources');
|
||||
if (sources.length === 0) {
|
||||
return {
|
||||
totalServers: 0,
|
||||
activeServers: 0,
|
||||
cpu: { used: 0, total: 0, percent: 0 },
|
||||
memory: { used: 0, total: 0, percent: 0 },
|
||||
disk: { used: 0, total: 0, percent: 0 },
|
||||
network: { total: 0, rx: 0, tx: 0 },
|
||||
traffic24h: { rx: 0, tx: 0, total: 0 },
|
||||
servers: []
|
||||
};
|
||||
}
|
||||
|
||||
const allMetrics = await Promise.all(sources.map(async (source) => {
|
||||
const cacheKey = `source_metrics:${source.url}:${source.name}`;
|
||||
const cached = await cache.get(cacheKey);
|
||||
if (cached) return cached;
|
||||
|
||||
try {
|
||||
const metrics = await prometheusService.getOverviewMetrics(source.url, source.name);
|
||||
// Don't set cache here if we want real-time WS push to be fresh,
|
||||
// but keeping it for REST API performance is fine.
|
||||
await cache.set(cacheKey, metrics, 15); // Cache for 15s
|
||||
return metrics;
|
||||
} catch (err) {
|
||||
console.error(`Error fetching metrics from ${source.name}:`, err.message);
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
|
||||
const validMetrics = allMetrics.filter(m => m !== null);
|
||||
|
||||
// Aggregate across all sources
|
||||
let totalServers = 0;
|
||||
let activeServers = 0;
|
||||
let cpuUsed = 0, cpuTotal = 0;
|
||||
let memUsed = 0, memTotal = 0;
|
||||
let diskUsed = 0, diskTotal = 0;
|
||||
let netRx = 0, netTx = 0;
|
||||
let traffic24hRx = 0, traffic24hTx = 0;
|
||||
let allServers = [];
|
||||
|
||||
for (const m of validMetrics) {
|
||||
totalServers += m.totalServers;
|
||||
activeServers += (m.activeServers !== undefined ? m.activeServers : m.totalServers);
|
||||
cpuUsed += m.cpu.used;
|
||||
cpuTotal += m.cpu.total;
|
||||
memUsed += m.memory.used;
|
||||
memTotal += m.memory.total;
|
||||
diskUsed += m.disk.used;
|
||||
diskTotal += m.disk.total;
|
||||
netRx += m.network.rx;
|
||||
netTx += m.network.tx;
|
||||
traffic24hRx += m.traffic24h.rx;
|
||||
traffic24hTx += m.traffic24h.tx;
|
||||
allServers = allServers.concat(m.servers);
|
||||
}
|
||||
|
||||
const overview = {
|
||||
totalServers,
|
||||
activeServers,
|
||||
cpu: {
|
||||
used: cpuUsed,
|
||||
total: cpuTotal,
|
||||
percent: cpuTotal > 0 ? (cpuUsed / cpuTotal * 100) : 0
|
||||
},
|
||||
memory: {
|
||||
used: memUsed,
|
||||
total: memTotal,
|
||||
percent: memTotal > 0 ? (memUsed / memTotal * 100) : 0
|
||||
},
|
||||
disk: {
|
||||
used: diskUsed,
|
||||
total: diskTotal,
|
||||
percent: diskTotal > 0 ? (diskUsed / diskTotal * 100) : 0
|
||||
},
|
||||
network: {
|
||||
total: netRx + netTx,
|
||||
rx: netRx,
|
||||
tx: netTx
|
||||
},
|
||||
traffic24h: {
|
||||
rx: traffic24hRx,
|
||||
tx: traffic24hTx,
|
||||
total: traffic24hRx + traffic24hTx
|
||||
},
|
||||
servers: allServers
|
||||
};
|
||||
|
||||
// --- Add Geo Information to Servers ---
|
||||
const geoServers = await Promise.all(overview.servers.map(async (server) => {
|
||||
const realInstance = server.originalInstance || prometheusService.resolveToken(server.instance);
|
||||
const cleanIp = realInstance.split(':')[0];
|
||||
|
||||
let geoData = null;
|
||||
try {
|
||||
const [rows] = await db.query('SELECT * FROM server_locations WHERE ip = ?', [cleanIp]);
|
||||
if (rows.length > 0) {
|
||||
geoData = rows[0];
|
||||
} else {
|
||||
geoService.getLocation(cleanIp).catch(() => {});
|
||||
}
|
||||
} catch (e) {}
|
||||
|
||||
const { originalInstance, ...safeServer } = server;
|
||||
if (geoData) {
|
||||
return {
|
||||
...safeServer,
|
||||
country: geoData.country,
|
||||
countryName: geoData.country_name,
|
||||
city: geoData.city,
|
||||
lat: geoData.latitude,
|
||||
lng: geoData.longitude
|
||||
};
|
||||
}
|
||||
return safeServer;
|
||||
}));
|
||||
|
||||
overview.servers = geoServers;
|
||||
return overview;
|
||||
}
|
||||
|
||||
// Get all aggregated metrics from all Prometheus sources
|
||||
app.get('/api/metrics/overview', async (req, res) => {
|
||||
try {
|
||||
const [sources] = await db.query('SELECT * FROM prometheus_sources');
|
||||
if (sources.length === 0) {
|
||||
return res.json({
|
||||
totalServers: 0,
|
||||
cpu: { used: 0, total: 0, percent: 0 },
|
||||
memory: { used: 0, total: 0, percent: 0 },
|
||||
disk: { used: 0, total: 0, percent: 0 },
|
||||
network: { total: 0, rx: 0, tx: 0 },
|
||||
traffic24h: { rx: 0, tx: 0, total: 0 },
|
||||
servers: []
|
||||
});
|
||||
}
|
||||
|
||||
const allMetrics = await Promise.all(sources.map(async (source) => {
|
||||
const cacheKey = `source_metrics:${source.url}:${source.name}`;
|
||||
const cached = await cache.get(cacheKey);
|
||||
if (cached) return cached;
|
||||
|
||||
try {
|
||||
const metrics = await prometheusService.getOverviewMetrics(source.url, source.name);
|
||||
await cache.set(cacheKey, metrics, 15); // Cache for 15s
|
||||
return metrics;
|
||||
} catch (err) {
|
||||
console.error(`Error fetching metrics from ${source.name}:`, err.message);
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
|
||||
const validMetrics = allMetrics.filter(m => m !== null);
|
||||
|
||||
// Aggregate across all sources
|
||||
let totalServers = 0;
|
||||
let activeServers = 0;
|
||||
let cpuUsed = 0, cpuTotal = 0;
|
||||
let memUsed = 0, memTotal = 0;
|
||||
let diskUsed = 0, diskTotal = 0;
|
||||
let netRx = 0, netTx = 0;
|
||||
let traffic24hRx = 0, traffic24hTx = 0;
|
||||
let allServers = [];
|
||||
|
||||
for (const m of validMetrics) {
|
||||
totalServers += m.totalServers;
|
||||
activeServers += m.activeServers || m.totalServers; // Default if missing
|
||||
cpuUsed += m.cpu.used;
|
||||
cpuTotal += m.cpu.total;
|
||||
memUsed += m.memory.used;
|
||||
memTotal += m.memory.total;
|
||||
diskUsed += m.disk.used;
|
||||
diskTotal += m.disk.total;
|
||||
netRx += m.network.rx;
|
||||
netTx += m.network.tx;
|
||||
traffic24hRx += m.traffic24h.rx;
|
||||
traffic24hTx += m.traffic24h.tx;
|
||||
allServers = allServers.concat(m.servers);
|
||||
}
|
||||
|
||||
|
||||
|
||||
const overview = {
|
||||
totalServers,
|
||||
activeServers,
|
||||
cpu: {
|
||||
used: cpuUsed,
|
||||
total: cpuTotal,
|
||||
percent: cpuTotal > 0 ? (cpuUsed / cpuTotal * 100) : 0
|
||||
},
|
||||
memory: {
|
||||
used: memUsed,
|
||||
total: memTotal,
|
||||
percent: memTotal > 0 ? (memUsed / memTotal * 100) : 0
|
||||
},
|
||||
disk: {
|
||||
used: diskUsed,
|
||||
total: diskTotal,
|
||||
percent: diskTotal > 0 ? (diskUsed / diskTotal * 100) : 0
|
||||
},
|
||||
network: {
|
||||
total: netRx + netTx,
|
||||
rx: netRx,
|
||||
tx: netTx
|
||||
},
|
||||
traffic24h: {
|
||||
rx: traffic24hRx,
|
||||
tx: traffic24hTx,
|
||||
total: traffic24hRx + traffic24hTx
|
||||
},
|
||||
servers: allServers
|
||||
};
|
||||
|
||||
// --- Add Geo Information to Servers ---
|
||||
const geoServers = await Promise.all(overview.servers.map(async (server) => {
|
||||
// Use originalInstance if available for correct location lookup
|
||||
const realInstance = server.originalInstance || prometheusService.resolveToken(server.instance);
|
||||
const cleanIp = realInstance.split(':')[0];
|
||||
|
||||
let geoData = null;
|
||||
|
||||
// Try to get from DB cache only (fast)
|
||||
try {
|
||||
const [rows] = await db.query('SELECT * FROM server_locations WHERE ip = ?', [cleanIp]);
|
||||
if (rows.length > 0) {
|
||||
geoData = rows[0];
|
||||
} else {
|
||||
// Trigger background resolution for future requests
|
||||
geoService.getLocation(cleanIp).catch(() => {});
|
||||
}
|
||||
} catch (e) {
|
||||
// DB error, skip geo for now
|
||||
}
|
||||
|
||||
// Prepare the server object without sensitive originalInstance
|
||||
const { originalInstance, ...safeServer } = server;
|
||||
|
||||
if (geoData) {
|
||||
return {
|
||||
...safeServer,
|
||||
country: geoData.country,
|
||||
countryName: geoData.country_name,
|
||||
city: geoData.city,
|
||||
lat: geoData.latitude,
|
||||
lng: geoData.longitude
|
||||
};
|
||||
}
|
||||
return safeServer;
|
||||
}));
|
||||
|
||||
overview.servers = geoServers;
|
||||
const overview = await getOverview();
|
||||
res.json(overview);
|
||||
} catch (err) {
|
||||
console.error('Error fetching overview metrics:', err);
|
||||
@@ -820,11 +821,38 @@ app.get('*', (req, res, next) => {
|
||||
});
|
||||
|
||||
|
||||
// ==================== WebSocket Server ====================
|
||||
|
||||
const server = http.createServer(app);
|
||||
const wss = new WebSocket.Server({ server });
|
||||
|
||||
function broadcast(data) {
|
||||
const message = JSON.stringify(data);
|
||||
wss.clients.forEach(client => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Broadcast loop
|
||||
async function broadcastMetrics() {
|
||||
try {
|
||||
const overview = await getOverview();
|
||||
broadcast({ type: 'overview', data: overview });
|
||||
} catch (err) {
|
||||
// console.error('WS Broadcast error:', err.message);
|
||||
}
|
||||
}
|
||||
|
||||
// Check and fix database integrity on startup
|
||||
checkAndFixDatabase();
|
||||
|
||||
app.listen(PORT, HOST, () => {
|
||||
console.log(`\n 🚀 Data Visualization Display Wall`);
|
||||
const REFRESH_INT = parseInt(process.env.REFRESH_INTERVAL) || 5000;
|
||||
setInterval(broadcastMetrics, REFRESH_INT);
|
||||
|
||||
server.listen(PORT, HOST, () => {
|
||||
console.log(`\n 🚀 Data Visualization Display Wall (WebSocket Enabled)`);
|
||||
console.log(` 📊 Server running at http://${HOST === '0.0.0.0' ? 'localhost' : HOST}:${PORT}`);
|
||||
console.log(` ⚙️ Configure Prometheus sources at http://${HOST === '0.0.0.0' ? 'localhost' : HOST}:${PORT}/settings\n`);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user