Files
PromdataPanel/server/prometheus-service.js
CN-JS-HuiBai 2f131b09c7 修复BUG
2026-04-04 19:15:40 +08:00

470 lines
14 KiB
JavaScript

const axios = require('axios');
const http = require('http');
const https = require('https');
const QUERY_TIMEOUT = 10000;
// Reusable agents to handle potential redirect issues and protocol mismatches
const httpAgent = new http.Agent({ keepAlive: true });
const httpsAgent = new https.Agent({ keepAlive: true, rejectUnauthorized: false });
/**
* Normalize URL and ensure protocol
*/
function normalizeUrl(baseUrl) {
let url = baseUrl.trim().replace(/\/+$/, '');
if (!/^https?:\/\//i.test(url)) {
url = 'http://' + url;
}
return url;
}
/**
* Create an axios instance for a given Prometheus URL
*/
function createClient(baseUrl) {
const url = normalizeUrl(baseUrl);
return axios.create({
baseURL: url,
timeout: QUERY_TIMEOUT,
httpAgent,
httpsAgent,
maxRedirects: 5
});
}
/**
* Test Prometheus connection
*/
async function testConnection(url) {
const normalized = normalizeUrl(url);
try {
// Using native fetch to avoid follow-redirects/axios "protocol mismatch" issues in some Node environments
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), QUERY_TIMEOUT);
// Node native fetch - handles http/https automatically
const res = await fetch(`${normalized}/api/v1/status/buildinfo`, {
signal: controller.signal
});
clearTimeout(timer);
if (!res.ok) {
throw new Error(`Prometheus returned HTTP ${res.status}`);
}
const data = await res.json();
return data?.data?.version || 'unknown';
} catch (err) {
if (err.name === 'AbortError') {
throw new Error('Connection timed out');
}
console.error(`[Prometheus] Connection test failed for ${normalized}:`, err.message);
throw err;
}
}
/**
* Execute a Prometheus instant query
*/
async function query(baseUrl, expr) {
const url = normalizeUrl(baseUrl);
try {
const params = new URLSearchParams({ query: expr });
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), QUERY_TIMEOUT);
const res = await fetch(`${url}/api/v1/query?${params.toString()}`, {
signal: controller.signal
});
clearTimeout(timer);
if (!res.ok) {
throw new Error(`Prometheus returned HTTP ${res.status}`);
}
const data = await res.json();
if (data.status !== 'success') {
throw new Error(`Prometheus query failed: ${data.error || 'unknown error'}`);
}
return data.data.result;
} catch (err) {
if (err.name === 'AbortError') {
throw new Error('Prometheus query timed out');
}
throw err;
}
}
/**
* Execute a Prometheus range query
*/
async function queryRange(baseUrl, expr, start, end, step) {
const url = normalizeUrl(baseUrl);
try {
const params = new URLSearchParams({ query: expr, start, end, step });
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), QUERY_TIMEOUT);
const res = await fetch(`${url}/api/v1/query_range?${params.toString()}`, {
signal: controller.signal
});
clearTimeout(timer);
if (!res.ok) {
throw new Error(`Prometheus returned HTTP ${res.status}`);
}
const data = await res.json();
if (data.status !== 'success') {
throw new Error(`Prometheus range query failed: ${data.error || 'unknown error'}`);
}
return data.data.result;
} catch (err) {
if (err.name === 'AbortError') {
throw new Error('Prometheus range query timed out');
}
throw err;
}
}
/**
* Get overview metrics from a single Prometheus source
*/
async function getOverviewMetrics(url, sourceName) {
// Run all queries in parallel
const [
cpuResult,
cpuCountResult,
memTotalResult,
memAvailResult,
diskTotalResult,
diskFreeResult,
netRxResult,
netTxResult,
traffic24hRxResult,
traffic24hTxResult,
upResult
] = await Promise.all([
// CPU usage per instance: 1 - avg idle
query(url, '100 - (avg by (instance, job) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)').catch(() => []),
// CPU count per instance
query(url, 'count by (instance, job) (node_cpu_seconds_total{mode="idle"})').catch(() => []),
// Memory total per instance
query(url, 'node_memory_MemTotal_bytes').catch(() => []),
// Memory available per instance
query(url, 'node_memory_MemAvailable_bytes').catch(() => []),
// Disk total per instance (root filesystem + /data)
query(url, 'sum by (instance, job) (node_filesystem_size_bytes{mountpoint=~"/|/data",fstype!="tmpfs"})').catch(() => []),
// Disk free per instance (root filesystem + /data)
query(url, 'sum by (instance, job) (node_filesystem_free_bytes{mountpoint=~"/|/data",fstype!="tmpfs"})').catch(() => []),
// Network receive rate (bytes/sec)
query(url, 'sum by (instance, job) (rate(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[5m]))').catch(() => []),
// Network transmit rate (bytes/sec)
query(url, 'sum by (instance, job) (rate(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[5m]))').catch(() => []),
// Total traffic received in last 24h
query(url, 'sum by (instance, job) (increase(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[24h]))').catch(() => []),
// Total traffic transmitted in last 24h
query(url, 'sum by (instance, job) (increase(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[24h]))').catch(() => []),
// Up instances (at least one successful scrape in last 5m)
// We broaden the job filter to catch more variations of node-exporter jobs
query(url, 'max_over_time(up{job=~".*node.*|.*exporter.*|.*host.*"}[5m])').catch(() => [])
]);
// Build per-instance data map
const instances = new Map();
const getOrCreate = (metric) => {
const key = metric.instance;
if (!instances.has(key)) {
instances.set(key, {
instance: key,
job: metric.job || 'Unknown',
source: sourceName,
cpuPercent: 0,
cpuCores: 0,
memTotal: 0,
memUsed: 0,
diskTotal: 0,
diskUsed: 0,
netRx: 0,
netTx: 0,
up: false
});
}
const inst = instances.get(key);
// If job was Unknown but we now have a job name, update it
if (inst.job === 'Unknown' && metric.job) {
inst.job = metric.job;
}
return inst;
};
// Parse UP status
for (const r of upResult) {
const inst = getOrCreate(r.metric);
inst.up = parseFloat(r.value[1]) === 1;
}
// Parse CPU usage
for (const r of cpuResult) {
const inst = getOrCreate(r.metric);
inst.cpuPercent = parseFloat(r.value[1]) || 0;
}
// Parse CPU count
for (const r of cpuCountResult) {
const inst = getOrCreate(r.metric);
inst.cpuCores = parseFloat(r.value[1]) || 0;
}
// Parse memory
for (const r of memTotalResult) {
const inst = getOrCreate(r.metric);
inst.memTotal = parseFloat(r.value[1]) || 0;
}
for (const r of memAvailResult) {
const inst = getOrCreate(r.metric);
inst.memUsed = inst.memTotal - (parseFloat(r.value[1]) || 0);
}
// Parse disk
for (const r of diskTotalResult) {
const inst = getOrCreate(r.metric);
inst.diskTotal = parseFloat(r.value[1]) || 0;
}
for (const r of diskFreeResult) {
const inst = getOrCreate(r.metric);
inst.diskUsed = inst.diskTotal - (parseFloat(r.value[1]) || 0);
}
// Parse network rates
for (const r of netRxResult) {
const inst = getOrCreate(r.metric);
inst.netRx = parseFloat(r.value[1]) || 0;
}
for (const r of netTxResult) {
const inst = getOrCreate(r.metric);
inst.netTx = parseFloat(r.value[1]) || 0;
}
// Final check: If an instance has non-zero CPU or Memory total data but is marked offline,
// it means we missed its 'up' metric due to job labels, but it's clearly sending data.
for (const inst of instances.values()) {
if (!inst.up && (inst.cpuPercent > 0 || inst.memTotal > 0)) {
inst.up = true;
}
}
// Aggregate
let totalCpuUsed = 0, totalCpuCores = 0;
let totalMemUsed = 0, totalMemTotal = 0;
let totalDiskUsed = 0, totalDiskTotal = 0;
let totalNetRx = 0, totalNetTx = 0;
let totalTraffic24hRx = 0, totalTraffic24hTx = 0;
for (const inst of instances.values()) {
totalCpuUsed += (inst.cpuPercent / 100) * inst.cpuCores;
totalCpuCores += inst.cpuCores;
totalMemUsed += inst.memUsed;
totalMemTotal += inst.memTotal;
totalDiskUsed += inst.diskUsed;
totalDiskTotal += inst.diskTotal;
totalNetRx += inst.netRx;
totalNetTx += inst.netTx;
}
// Parse 24h traffic
for (const r of traffic24hRxResult) {
totalTraffic24hRx += parseFloat(r.value[1]) || 0;
}
for (const r of traffic24hTxResult) {
totalTraffic24hTx += parseFloat(r.value[1]) || 0;
}
return {
totalServers: instances.size,
cpu: {
used: totalCpuUsed,
total: totalCpuCores,
percent: totalCpuCores > 0 ? (totalCpuUsed / totalCpuCores * 100) : 0
},
memory: {
used: totalMemUsed,
total: totalMemTotal,
percent: totalMemTotal > 0 ? (totalMemUsed / totalMemTotal * 100) : 0
},
disk: {
used: totalDiskUsed,
total: totalDiskTotal,
percent: totalDiskTotal > 0 ? (totalDiskUsed / totalDiskTotal * 100) : 0
},
network: {
rx: totalNetRx,
tx: totalNetTx,
total: totalNetRx + totalNetTx
},
traffic24h: {
rx: totalTraffic24hRx,
tx: totalTraffic24hTx
},
servers: Array.from(instances.values())
};
}
/**
* Get network traffic history (past 24h, 15-min intervals)
*/
async function getNetworkHistory(url) {
const now = Math.floor(Date.now() / 1000);
const start = now - 86400; // 24h ago
const step = 900; // 15 minutes
const [rxResult, txResult] = await Promise.all([
queryRange(url,
'sum(rate(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[5m]))',
start, now, step
).catch(() => []),
queryRange(url,
'sum(rate(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[5m]))',
start, now, step
).catch(() => [])
]);
// Extract values - each result[0].values = [[timestamp, value], ...]
const rxValues = rxResult.length > 0 ? rxResult[0].values : [];
const txValues = txResult.length > 0 ? txResult[0].values : [];
return { rxValues, txValues };
}
/**
* Merge network histories from multiple sources
*/
function mergeNetworkHistories(histories) {
const timestampMap = new Map();
for (const history of histories) {
for (const [ts, val] of history.rxValues) {
const existing = timestampMap.get(ts) || { rx: 0, tx: 0 };
existing.rx += parseFloat(val) || 0;
timestampMap.set(ts, existing);
}
for (const [ts, val] of history.txValues) {
const existing = timestampMap.get(ts) || { rx: 0, tx: 0 };
existing.tx += parseFloat(val) || 0;
timestampMap.set(ts, existing);
}
}
const sorted = [...timestampMap.entries()].sort((a, b) => a[0] - b[0]);
return {
timestamps: sorted.map(([ts]) => ts * 1000), // ms for JS
rx: sorted.map(([, v]) => v.rx),
tx: sorted.map(([, v]) => v.tx)
};
}
/**
* Get CPU usage history (past 1h, 1-min intervals)
*/
async function getCpuHistory(url) {
const now = Math.floor(Date.now() / 1000);
const start = now - 3600; // 1h ago
const step = 60; // 1 minute
const result = await queryRange(url,
'100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)',
start, now, step
).catch(() => []);
const values = result.length > 0 ? result[0].values : [];
return values; // [[timestamp, value], ...]
}
/**
* Merge CPU histories from multiple sources (average)
*/
function mergeCpuHistories(histories) {
const timestampMap = new Map();
for (const history of histories) {
for (const [ts, val] of history) {
const existing = timestampMap.get(ts) || { sum: 0, count: 0 };
existing.sum += parseFloat(val) || 0;
existing.count += 1;
timestampMap.set(ts, existing);
}
}
const sorted = [...timestampMap.entries()].sort((a, b) => a[0] - b[0]);
return {
timestamps: sorted.map(([ts]) => ts * 1000),
values: sorted.map(([, v]) => v.sum / v.count)
};
}
/**
* Get aggregated traffic history range for preloading (past 24h, 5-min intervals)
*/
async function getTrafficHistoryRange(url) {
const now = Math.floor(Date.now() / 1000);
const start = now - 86400; // 24h ago
const step = 5; // 5 seconds (17,280 points for 24h)
const queries = [
'sum(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"})',
'sum(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"})',
'sum(rate(node_network_receive_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[5m]))',
'sum(rate(node_network_transmit_bytes_total{device!~"lo|veth.*|docker.*|br-.*"}[5m]))'
];
const results = await Promise.all(queries.map(q =>
queryRange(url, q, start, now, step).catch(() => [])
));
const rxBytesRes = results[0];
const txBytesRes = results[1];
const rxBWRes = results[2];
const txBWRes = results[3];
// Map results by timestamp
const dataMap = new Map();
const process = (res, field) => {
if (res.length > 0 && res[0].values) {
for (const [ts, val] of res[0].values) {
const entry = dataMap.get(ts) || { ts, rxBytes: 0, txBytes: 0, rxBW: 0, txBW: 0 };
entry[field] = parseFloat(val) || 0;
dataMap.set(ts, entry);
}
}
};
process(rxBytesRes, 'rxBytes');
process(txBytesRes, 'txBytes');
process(rxBWRes, 'rxBW');
process(txBWRes, 'txBW');
return Array.from(dataMap.values());
}
module.exports = {
testConnection,
query,
queryRange,
getOverviewMetrics,
getNetworkHistory,
mergeNetworkHistories,
getCpuHistory,
mergeCpuHistories,
getTrafficHistoryRange
};