WebSocket 实时推送进度
This commit is contained in:
80
server.js
80
server.js
@@ -3,6 +3,8 @@ const cors = require('cors');
|
||||
const dotenv = require('dotenv');
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const http = require('http');
|
||||
const WebSocket = require('ws');
|
||||
const ffmpeg = require('fluent-ffmpeg');
|
||||
const { S3Client, ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3');
|
||||
|
||||
@@ -11,6 +13,7 @@ dotenv.config();
|
||||
const app = express();
|
||||
const PORT = process.env.PORT || 3000;
|
||||
const HOST = process.env.HOST || process.env.LISTEN_ADDRESS || '0.0.0.0';
|
||||
const server = http.createServer(app);
|
||||
|
||||
app.use(cors());
|
||||
app.use(express.json());
|
||||
@@ -29,6 +32,63 @@ const s3Client = new S3Client({
|
||||
|
||||
const BUCKET_NAME = process.env.S3_BUCKET_NAME;
|
||||
const progressMap = {};
|
||||
const wsSubscriptions = new Map();
|
||||
|
||||
const addWsClient = (key, ws) => {
|
||||
if (!wsSubscriptions.has(key)) {
|
||||
wsSubscriptions.set(key, new Set());
|
||||
}
|
||||
wsSubscriptions.get(key).add(ws);
|
||||
ws.currentKey = key;
|
||||
};
|
||||
|
||||
const removeWsClient = (ws) => {
|
||||
const key = ws.currentKey;
|
||||
if (!key) return;
|
||||
const set = wsSubscriptions.get(key);
|
||||
if (!set) return;
|
||||
set.delete(ws);
|
||||
if (set.size === 0) wsSubscriptions.delete(key);
|
||||
};
|
||||
|
||||
const broadcastWs = (key, payload) => {
|
||||
const clients = wsSubscriptions.get(key);
|
||||
if (!clients) return;
|
||||
const message = JSON.stringify(payload);
|
||||
for (const client of clients) {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(message);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const wss = new WebSocket.Server({ server });
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
ws.on('message', (raw) => {
|
||||
try {
|
||||
const message = JSON.parse(raw.toString());
|
||||
if (message.type === 'subscribe' && typeof message.key === 'string') {
|
||||
if (ws.currentKey && ws.currentKey !== message.key) {
|
||||
removeWsClient(ws);
|
||||
}
|
||||
addWsClient(message.key, ws);
|
||||
|
||||
const currentProgress = progressMap[message.key];
|
||||
if (currentProgress) {
|
||||
ws.send(JSON.stringify({ type: 'progress', key: message.key, progress: currentProgress }));
|
||||
if (currentProgress.status === 'finished' && currentProgress.hlsUrl) {
|
||||
ws.send(JSON.stringify({ type: 'ready', key: message.key, hlsUrl: currentProgress.hlsUrl }));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('WebSocket parse error:', error);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => removeWsClient(ws));
|
||||
});
|
||||
|
||||
// Endpoint to list videos in the bucket
|
||||
app.get('/api/videos', async (req, res) => {
|
||||
@@ -88,7 +148,7 @@ app.post('/api/transcode', async (req, res) => {
|
||||
const m3u8Path = path.join(outputDir, 'index.m3u8');
|
||||
const hlsUrl = `/hls/${progressKey}/index.m3u8`;
|
||||
|
||||
progressMap[progressKey] = { status: 'pending', percent: 0, details: 'Waiting for ffmpeg to start' };
|
||||
progressMap[progressKey] = { status: 'pending', percent: 0, details: 'Waiting for ffmpeg to start', hlsUrl };
|
||||
|
||||
// If it already exists, just return the URL
|
||||
if (fs.existsSync(m3u8Path)) {
|
||||
@@ -124,23 +184,31 @@ app.post('/api/transcode', async (req, res) => {
|
||||
])
|
||||
.output(m3u8Path)
|
||||
.on('progress', (progress) => {
|
||||
progressMap[progressKey] = {
|
||||
const progressState = {
|
||||
status: 'transcoding',
|
||||
percent: Math.min(Math.max(Math.round(progress.percent || 0), 0), 100),
|
||||
frame: progress.frames || null,
|
||||
fps: progress.currentFps || null,
|
||||
bitrate: progress.currentKbps || null,
|
||||
timemark: progress.timemark || null,
|
||||
details: `Transcoding... ${Math.min(Math.max(Math.round(progress.percent || 0), 0), 100)}%`
|
||||
details: `Transcoding... ${Math.min(Math.max(Math.round(progress.percent || 0), 0), 100)}%`,
|
||||
hlsUrl
|
||||
};
|
||||
progressMap[progressKey] = progressState;
|
||||
broadcastWs(progressKey, { type: 'progress', key: progressKey, progress: progressState });
|
||||
})
|
||||
.on('end', () => {
|
||||
console.log(`Finished transcoding ${key} to HLS`);
|
||||
progressMap[progressKey] = { status: 'finished', percent: 100, details: 'Transcoding complete' };
|
||||
const progressState = { status: 'finished', percent: 100, details: 'Transcoding complete', hlsUrl };
|
||||
progressMap[progressKey] = progressState;
|
||||
broadcastWs(progressKey, { type: 'progress', key: progressKey, progress: progressState });
|
||||
broadcastWs(progressKey, { type: 'ready', key: progressKey, hlsUrl });
|
||||
})
|
||||
.on('error', (err) => {
|
||||
console.error(`Error transcoding ${key}:`, err);
|
||||
progressMap[progressKey] = { status: 'failed', percent: progressMap[progressKey]?.percent || 0, details: err.message || 'Transcoding failed' };
|
||||
const failedState = { status: 'failed', percent: progressMap[progressKey]?.percent || 0, details: err.message || 'Transcoding failed', hlsUrl };
|
||||
progressMap[progressKey] = failedState;
|
||||
broadcastWs(progressKey, { type: 'progress', key: progressKey, progress: failedState });
|
||||
})
|
||||
.run();
|
||||
|
||||
@@ -171,6 +239,6 @@ app.get('/api/status', (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
app.listen(PORT, HOST, () => {
|
||||
server.listen(PORT, HOST, () => {
|
||||
console.log(`Server running on http://${HOST}:${PORT}`);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user