利用 Prometheus 自身的数据持久化能力保证统计的准确性
This commit is contained in:
101
server/index.js
101
server/index.js
@@ -535,19 +535,7 @@ app.get('/api/metrics/overview', async (req, res) => {
|
|||||||
allServers = allServers.concat(m.servers);
|
allServers = allServers.concat(m.servers);
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- 24h Traffic from DB (Integrating Bandwidth) ---
|
|
||||||
try {
|
|
||||||
// Each record represents a 5-second interval
|
|
||||||
const [sumRows] = await db.query('SELECT SUM(rx_bandwidth) as sumRx, SUM(tx_bandwidth) as sumTx FROM traffic_stats WHERE timestamp >= NOW() - INTERVAL 1 DAY');
|
|
||||||
|
|
||||||
if (sumRows.length > 0 && sumRows[0].sumRx !== null) {
|
|
||||||
// Total bytes = Sum of (bytes/sec) * 5 seconds
|
|
||||||
traffic24hRx = sumRows[0].sumRx * 5;
|
|
||||||
traffic24hTx = sumRows[0].sumTx * 5;
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
console.error('Error calculating 24h traffic from DB integration:', err);
|
|
||||||
}
|
|
||||||
|
|
||||||
const overview = {
|
const overview = {
|
||||||
totalServers,
|
totalServers,
|
||||||
@@ -616,23 +604,31 @@ app.get('/api/metrics/overview', async (req, res) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Get network traffic history from DB (past 24h)
|
// Get network traffic history (past 24h) from Prometheus
|
||||||
app.get('/api/metrics/network-history', async (req, res) => {
|
app.get('/api/metrics/network-history', async (req, res) => {
|
||||||
try {
|
try {
|
||||||
const [rows] = await db.query('SELECT rx_bandwidth, tx_bandwidth, UNIX_TIMESTAMP(timestamp) as ts FROM traffic_stats WHERE timestamp >= NOW() - INTERVAL 1 DAY ORDER BY ts ASC');
|
const [sources] = await db.query('SELECT * FROM prometheus_sources');
|
||||||
|
if (sources.length === 0) {
|
||||||
if (rows.length === 0) {
|
|
||||||
return res.json({ timestamps: [], rx: [], tx: [] });
|
return res.json({ timestamps: [], rx: [], tx: [] });
|
||||||
}
|
}
|
||||||
|
|
||||||
res.json({
|
const histories = await Promise.all(sources.map(source =>
|
||||||
timestamps: rows.map(r => r.ts * 1000),
|
prometheusService.getNetworkHistory(source.url).catch(err => {
|
||||||
rx: rows.map(r => r.rx_bandwidth),
|
console.error(`Error fetching network history from ${source.name}:`, err.message);
|
||||||
tx: rows.map(r => r.tx_bandwidth)
|
return null;
|
||||||
});
|
})
|
||||||
|
));
|
||||||
|
|
||||||
|
const validHistories = histories.filter(h => h !== null);
|
||||||
|
if (validHistories.length === 0) {
|
||||||
|
return res.json({ timestamps: [], rx: [], tx: [] });
|
||||||
|
}
|
||||||
|
|
||||||
|
const merged = prometheusService.mergeNetworkHistories(validHistories);
|
||||||
|
res.json(merged);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error('Error fetching network history from DB:', err);
|
console.error('Error fetching network history history:', err);
|
||||||
res.status(500).json({ error: 'Failed to fetch network history' });
|
res.status(500).json({ error: 'Failed to fetch network history history' });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -716,66 +712,9 @@ app.get('*', (req, res, next) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
async function recordTrafficStats() {
|
|
||||||
if (!isDbInitialized) return;
|
|
||||||
try {
|
|
||||||
const [sources] = await db.query('SELECT * FROM prometheus_sources');
|
|
||||||
if (sources.length === 0) return;
|
|
||||||
|
|
||||||
let totalRxBytes = 0;
|
|
||||||
let totalTxBytes = 0;
|
|
||||||
let totalRxBandwidth = 0;
|
|
||||||
let totalTxBandwidth = 0;
|
|
||||||
|
|
||||||
const results = await Promise.all(sources.map(async source => {
|
|
||||||
try {
|
|
||||||
const [rxBytesRes, txBytesRes, rxBWRes, txBWRes] = await Promise.all([
|
|
||||||
prometheusService.query(source.url, 'sum(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"})'),
|
|
||||||
prometheusService.query(source.url, 'sum(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"})'),
|
|
||||||
prometheusService.query(source.url, 'sum(rate(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[1m]))'),
|
|
||||||
prometheusService.query(source.url, 'sum(rate(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[1m]))')
|
|
||||||
]);
|
|
||||||
|
|
||||||
return {
|
|
||||||
rxBytes: (rxBytesRes.length > 0) ? parseFloat(rxBytesRes[0].value[1]) : 0,
|
|
||||||
txBytes: (txBytesRes.length > 0) ? parseFloat(txBytesRes[0].value[1]) : 0,
|
|
||||||
rxBW: (rxBWRes.length > 0) ? parseFloat(rxBWRes[0].value[1]) : 0,
|
|
||||||
txBW: (txBWRes.length > 0) ? parseFloat(txBWRes[0].value[1]) : 0
|
|
||||||
};
|
|
||||||
} catch (e) {
|
|
||||||
return { rxBytes: 0, txBytes: 0, rxBW: 0, txBW: 0 };
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
for (const r of results) {
|
|
||||||
totalRxBytes += r.rxBytes;
|
|
||||||
totalTxBytes += r.txBytes;
|
|
||||||
totalRxBandwidth += r.rxBW;
|
|
||||||
totalTxBandwidth += r.txBW;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Always insert a record if we have sources, so the timeline advances
|
|
||||||
// Even if traffic is 0, we want to see 0 on the chart
|
|
||||||
await db.query('INSERT INTO traffic_stats (rx_bytes, tx_bytes, rx_bandwidth, tx_bandwidth) VALUES (?, ?, ?, ?)', [
|
|
||||||
Math.round(totalRxBytes),
|
|
||||||
Math.round(totalTxBytes),
|
|
||||||
totalRxBandwidth,
|
|
||||||
totalTxBandwidth
|
|
||||||
]);
|
|
||||||
console.log(`[Traffic Recorder] Saved stats: BW_RX=${totalRxBandwidth.toFixed(2)}, BW_TX=${totalTxBandwidth.toFixed(2)}`);
|
|
||||||
} catch (err) {
|
|
||||||
console.error('[Traffic Recorder] Error recording stats:', err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check and fix database integrity on startup
|
// Check and fix database integrity on startup
|
||||||
checkAndFixDatabase();
|
checkAndFixDatabase();
|
||||||
|
|
||||||
// Record traffic every 5 seconds (17,280 points/day)
|
|
||||||
setInterval(recordTrafficStats, 5 * 1000);
|
|
||||||
// Initial record after a short delay
|
|
||||||
setTimeout(recordTrafficStats, 10000);
|
|
||||||
|
|
||||||
app.listen(PORT, HOST, () => {
|
app.listen(PORT, HOST, () => {
|
||||||
console.log(`\n 🚀 Data Visualization Display Wall`);
|
console.log(`\n 🚀 Data Visualization Display Wall`);
|
||||||
console.log(` 📊 Server running at http://${HOST === '0.0.0.0' ? 'localhost' : HOST}:${PORT}`);
|
console.log(` 📊 Server running at http://${HOST === '0.0.0.0' ? 'localhost' : HOST}:${PORT}`);
|
||||||
|
|||||||
@@ -188,8 +188,6 @@ async function getOverviewMetrics(url, sourceName) {
|
|||||||
diskFreeResult,
|
diskFreeResult,
|
||||||
netRxResult,
|
netRxResult,
|
||||||
netTxResult,
|
netTxResult,
|
||||||
traffic24hRxResult,
|
|
||||||
traffic24hTxResult,
|
|
||||||
targetsResult
|
targetsResult
|
||||||
] = await Promise.all([
|
] = await Promise.all([
|
||||||
// CPU usage per instance: 1 - avg idle
|
// CPU usage per instance: 1 - avg idle
|
||||||
@@ -208,14 +206,13 @@ async function getOverviewMetrics(url, sourceName) {
|
|||||||
query(url, 'sum by (instance, job) (rate(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[1m]))').catch(() => []),
|
query(url, 'sum by (instance, job) (rate(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[1m]))').catch(() => []),
|
||||||
// Network transmit rate (bytes/sec)
|
// Network transmit rate (bytes/sec)
|
||||||
query(url, 'sum by (instance, job) (rate(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[1m]))').catch(() => []),
|
query(url, 'sum by (instance, job) (rate(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[1m]))').catch(() => []),
|
||||||
// Total traffic received in last 24h
|
|
||||||
query(url, 'sum by (instance, job) (increase(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[24h]))').catch(() => []),
|
|
||||||
// Total traffic transmitted in last 24h
|
|
||||||
query(url, 'sum by (instance, job) (increase(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[24h]))').catch(() => []),
|
|
||||||
// Targets status from /api/v1/targets
|
// Targets status from /api/v1/targets
|
||||||
getTargets(url).catch(() => [])
|
getTargets(url).catch(() => [])
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
// Fetch 24h detailed traffic using the A*duration logic
|
||||||
|
const traffic24hSum = await get24hTrafficSum(url).catch(() => ({ rx: 0, tx: 0 }));
|
||||||
|
|
||||||
// Build per-instance data map
|
// Build per-instance data map
|
||||||
const instances = new Map();
|
const instances = new Map();
|
||||||
|
|
||||||
@@ -334,13 +331,9 @@ async function getOverviewMetrics(url, sourceName) {
|
|||||||
totalNetTx += inst.netTx;
|
totalNetTx += inst.netTx;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse 24h traffic
|
// Use the pre-calculated 24h traffic
|
||||||
for (const r of traffic24hRxResult) {
|
totalTraffic24hRx = traffic24hSum.rx;
|
||||||
totalTraffic24hRx += parseFloat(r.value[1]) || 0;
|
totalTraffic24hTx = traffic24hSum.tx;
|
||||||
}
|
|
||||||
for (const r of traffic24hTxResult) {
|
|
||||||
totalTraffic24hTx += parseFloat(r.value[1]) || 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
totalServers: allInstancesList.length,
|
totalServers: allInstancesList.length,
|
||||||
@@ -377,13 +370,53 @@ async function getOverviewMetrics(url, sourceName) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get network traffic history (past 24h, 15-min intervals)
|
* Calculate total traffic from bandwidth data points using the A*duration logic
|
||||||
|
*/
|
||||||
|
function calculateTrafficFromHistory(values) {
|
||||||
|
if (!values || values.length < 2) return 0;
|
||||||
|
|
||||||
|
let totalBytes = 0;
|
||||||
|
for (let i = 0; i < values.length - 1; i++) {
|
||||||
|
const [tsA, valA] = values[i];
|
||||||
|
const [tsB] = values[i+1];
|
||||||
|
const duration = tsB - tsA;
|
||||||
|
totalBytes += parseFloat(valA) * duration;
|
||||||
|
}
|
||||||
|
return totalBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get total traffic for the past 24h by fetching all points and integrating
|
||||||
|
*/
|
||||||
|
async function get24hTrafficSum(url) {
|
||||||
|
const now = Math.floor(Date.now() / 1000);
|
||||||
|
const start = now - 86400;
|
||||||
|
const step = 60; // 1-minute points for calculation
|
||||||
|
|
||||||
|
const [rxResult, txResult] = await Promise.all([
|
||||||
|
queryRange(url, 'sum(rate(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[1m]))', start, now, step).catch(() => []),
|
||||||
|
queryRange(url, 'sum(rate(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[1m]))', start, now, step).catch(() => [])
|
||||||
|
]);
|
||||||
|
|
||||||
|
const rxValues = rxResult.length > 0 ? rxResult[0].values : [];
|
||||||
|
const txValues = txResult.length > 0 ? txResult[0].values : [];
|
||||||
|
|
||||||
|
return {
|
||||||
|
rx: calculateTrafficFromHistory(rxValues),
|
||||||
|
tx: calculateTrafficFromHistory(txValues)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get network traffic history (past 24h, 5-min intervals for chart)
|
||||||
*/
|
*/
|
||||||
async function getNetworkHistory(url) {
|
async function getNetworkHistory(url) {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = Math.floor(Date.now() / 1000);
|
||||||
const start = now - 86400; // 24h ago
|
const start = now - 86400; // 24h ago
|
||||||
const step = 900; // 15 minutes
|
const step = 300; // 5 minutes for better resolution on chart
|
||||||
|
|
||||||
const [rxResult, txResult] = await Promise.all([
|
const [rxResult, txResult] = await Promise.all([
|
||||||
queryRange(url,
|
queryRange(url,
|
||||||
@@ -645,6 +678,7 @@ module.exports = {
|
|||||||
queryRange,
|
queryRange,
|
||||||
getTargets,
|
getTargets,
|
||||||
getOverviewMetrics,
|
getOverviewMetrics,
|
||||||
|
get24hTrafficSum,
|
||||||
getNetworkHistory,
|
getNetworkHistory,
|
||||||
mergeNetworkHistories,
|
mergeNetworkHistories,
|
||||||
getCpuHistory,
|
getCpuHistory,
|
||||||
|
|||||||
Reference in New Issue
Block a user