修改为直接与blackbox通信

This commit is contained in:
CN-JS-HuiBai
2026-04-06 00:24:33 +08:00
parent d4d2927963
commit e8b60ce28b
6 changed files with 167 additions and 36 deletions

View File

@@ -38,15 +38,22 @@ async function checkAndFixDatabase() {
console.log(`[Database Integrity] ✅ Missing tables created.`);
}
// Check for is_server_source in prometheus_sources
// Check for is_server_source and type in prometheus_sources
const [promColumns] = await db.query("SHOW COLUMNS FROM prometheus_sources");
const promColumnNames = promColumns.map(c => c.Field);
if (!promColumnNames.includes('is_server_source')) {
console.log(`[Database Integrity] ⚠️ Missing column 'is_server_source' in 'prometheus_sources'. Adding it...`);
await db.query("ALTER TABLE prometheus_sources ADD COLUMN is_server_source TINYINT(1) DEFAULT 1 AFTER description");
console.log(`[Database Integrity] ✅ Column 'is_server_source' added.`);
}
if (!promColumnNames.includes('type')) {
console.log(`[Database Integrity] ⚠️ Missing column 'type' in 'prometheus_sources'. Adding it...`);
await db.query("ALTER TABLE prometheus_sources ADD COLUMN type VARCHAR(50) DEFAULT 'prometheus' AFTER is_server_source");
console.log(`[Database Integrity] ✅ Column 'type' added.`);
}
// Check for new columns in site_settings
const [columns] = await db.query("SHOW COLUMNS FROM site_settings");
const columnNames = columns.map(c => c.Field);

View File

@@ -6,6 +6,7 @@ const db = require('./db');
const prometheusService = require('./prometheus-service');
const cache = require('./cache');
const geoService = require('./geo-service');
const latencyService = require('./latency-service');
const checkAndFixDatabase = require('./db-integrity-check');
const http = require('http');
const WebSocket = require('ws');
@@ -451,7 +452,14 @@ app.get('/api/sources', async (req, res) => {
// Test connectivity for each source
const sourcesWithStatus = await Promise.all(rows.map(async (source) => {
try {
const response = await prometheusService.testConnection(source.url);
let response;
if (source.type === 'blackbox') {
// Simple check for blackbox exporter
const res = await fetch(`${source.url.replace(/\/+$/, '')}/metrics`, { timeout: 3000 }).catch(() => null);
response = (res && res.ok) ? 'Blackbox Exporter Ready' : 'Connection Error';
} else {
response = await prometheusService.testConnection(source.url);
}
return { ...source, status: 'online', version: response };
} catch (e) {
return { ...source, status: 'offline', version: null };
@@ -466,15 +474,15 @@ app.get('/api/sources', async (req, res) => {
// Add a new Prometheus source
app.post('/api/sources', requireAuth, async (req, res) => {
let { name, url, description, is_server_source } = req.body;
let { name, url, description, is_server_source, type } = req.body;
if (!name || !url) {
return res.status(400).json({ error: 'Name and URL are required' });
}
if (!/^https?:\/\//i.test(url)) url = 'http://' + url;
try {
const [result] = await db.query(
'INSERT INTO prometheus_sources (name, url, description, is_server_source) VALUES (?, ?, ?, ?)',
[name, url, description || '', is_server_source === undefined ? 1 : (is_server_source ? 1 : 0)]
'INSERT INTO prometheus_sources (name, url, description, is_server_source, type) VALUES (?, ?, ?, ?, ?)',
[name, url, description || '', is_server_source === undefined ? 1 : (is_server_source ? 1 : 0), type || 'prometheus']
);
const [rows] = await db.query('SELECT * FROM prometheus_sources WHERE id = ?', [result.insertId]);
@@ -494,8 +502,8 @@ app.put('/api/sources/:id', requireAuth, async (req, res) => {
if (url && !/^https?:\/\//i.test(url)) url = 'http://' + url;
try {
await db.query(
'UPDATE prometheus_sources SET name = ?, url = ?, description = ?, is_server_source = ? WHERE id = ?',
[name, url, description || '', is_server_source ? 1 : 0, req.params.id]
'UPDATE prometheus_sources SET name = ?, url = ?, description = ?, is_server_source = ?, type = ? WHERE id = ?',
[name, url, description || '', is_server_source ? 1 : 0, type || 'prometheus', req.params.id]
);
// Clear network history cache
await cache.del('network_history_all');
@@ -523,11 +531,18 @@ app.delete('/api/sources/:id', requireAuth, async (req, res) => {
// Test connection to a Prometheus source
app.post('/api/sources/test', async (req, res) => {
let { url } = req.body;
let { url, type } = req.body;
if (url && !/^https?:\/\//i.test(url)) url = 'http://' + url;
try {
const version = await prometheusService.testConnection(url);
res.json({ status: 'ok', version });
let result;
if (type === 'blackbox') {
const resVal = await fetch(`${url.replace(/\/+$/, '')}/metrics`, { timeout: 5000 }).catch(() => null);
result = (resVal && resVal.ok) ? 'Blackbox Exporter Ready' : 'Connection Failed';
if (!resVal || !resVal.ok) throw new Error(result);
} else {
result = await prometheusService.testConnection(url);
}
res.json({ status: 'ok', version: result });
} catch (err) {
res.status(400).json({ status: 'error', message: err.message });
}
@@ -893,7 +908,14 @@ app.get('/api/metrics/latency', async (req, res) => {
}
const results = await Promise.all(routes.map(async (route) => {
const latency = await prometheusService.getLatency(route.url, route.latency_target);
// Try to get from Valkey first (filled by background latencyService)
let latency = await cache.get(`latency:route:${route.id}`);
// Fallback if not in cache (maybe service just started or failed)
if (latency === null) {
latency = await prometheusService.getLatency(route.url, route.latency_target);
}
return {
id: route.id,
source: route.latency_source,
@@ -933,8 +955,9 @@ async function broadcastMetrics() {
}
}
// Check and fix database integrity on startup
// Start services
checkAndFixDatabase();
latencyService.start();
const REFRESH_INT = parseInt(process.env.REFRESH_INTERVAL) || 5000;
setInterval(broadcastMetrics, REFRESH_INT);

77
server/latency-service.js Normal file
View File

@@ -0,0 +1,77 @@
const axios = require('axios');
const cache = require('./cache');
const db = require('./db');
const POLL_INTERVAL = 10000; // 10 seconds
async function pollLatency() {
try {
const [routes] = await db.query(`
SELECT r.*, s.url
FROM latency_routes r
JOIN prometheus_sources s ON r.source_id = s.id
WHERE s.type = 'blackbox'
`);
if (routes.length === 0) return;
// Poll each route
await Promise.allSettled(routes.map(async (route) => {
try {
// Blackbox exporter probe URL
// We assume ICMP module for now. If target is a URL, maybe use http_2xx
let module = 'icmp';
let target = route.latency_target;
if (target.startsWith('http://') || target.startsWith('https://')) {
module = 'http_2xx';
}
const probeUrl = `${route.url.replace(/\/+$/, '')}/probe?module=${module}&target=${encodeURIComponent(target)}`;
const startTime = Date.now();
const response = await axios.get(probeUrl, { timeout: 5000 });
const duration = (Date.now() - startTime) / 1000; // Fallback to local timing if parsing fails
// Parse prometheus text format for probe_duration_seconds
let latency = null;
const lines = response.data.split('\n');
for (const line of lines) {
// Match "probe_duration_seconds 0.123" or "probe_duration_seconds{...} 0.123"
const match = line.match(/^probe_duration_seconds(?:\{.*\})?\s+([\d.]+)/);
if (match) {
latency = parseFloat(match[1]) * 1000; // to ms
break;
}
}
if (latency === null) {
// Fallback to local response time if metric not found in output
latency = duration * 1000;
}
// Save to Valkey
await cache.set(`latency:route:${route.id}`, latency, 60);
// console.log(`[Latency] Route ${route.id} (${target}): ${latency.toFixed(2)}ms`);
} catch (err) {
// console.error(`[Latency] Error polling route ${route.id}:`, err.message);
await cache.set(`latency:route:${route.id}`, null, 60);
}
}));
} catch (err) {
console.error('[Latency] Service error:', err.message);
}
}
let intervalId = null;
function start() {
if (intervalId) clearInterval(intervalId);
pollLatency(); // initial run
intervalId = setInterval(pollLatency, POLL_INTERVAL);
console.log('[Latency] Background service started (polling Blackbox Exporter directly)');
}
module.exports = {
start
};