const express = require('express'); const cors = require('cors'); const dotenv = require('dotenv'); const fs = require('fs'); const path = require('path'); const http = require('http'); const WebSocket = require('ws'); const ffmpeg = require('fluent-ffmpeg'); const { S3Client, ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3'); dotenv.config(); const app = express(); const PORT = process.env.PORT || 3000; const HOST = process.env.HOST || process.env.LISTEN_ADDRESS || '0.0.0.0'; const server = http.createServer(app); app.use(cors()); app.use(express.json()); app.use(express.static('public')); // Configure AWS S3 Client const s3Client = new S3Client({ region: process.env.AWS_REGION || 'us-east-1', endpoint: process.env.S3_ENDPOINT, forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true', credentials: { accessKeyId: process.env.AWS_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY } }); const BUCKET_NAME = process.env.S3_BUCKET_NAME; const progressMap = {}; const wsSubscriptions = new Map(); const addWsClient = (key, ws) => { if (!wsSubscriptions.has(key)) { wsSubscriptions.set(key, new Set()); } wsSubscriptions.get(key).add(ws); ws.currentKey = key; }; const removeWsClient = (ws) => { const key = ws.currentKey; if (!key) return; const set = wsSubscriptions.get(key); if (!set) return; set.delete(ws); if (set.size === 0) wsSubscriptions.delete(key); }; const broadcastWs = (key, payload) => { const clients = wsSubscriptions.get(key); if (!clients) return; const message = JSON.stringify(payload); for (const client of clients) { if (client.readyState === WebSocket.OPEN) { client.send(message); } } }; const wss = new WebSocket.Server({ server }); wss.on('connection', (ws) => { ws.on('message', (raw) => { try { const message = JSON.parse(raw.toString()); if (message.type === 'subscribe' && typeof message.key === 'string') { if (ws.currentKey && ws.currentKey !== message.key) { removeWsClient(ws); } addWsClient(message.key, ws); const currentProgress = progressMap[message.key]; if (currentProgress) { ws.send(JSON.stringify({ type: 'progress', key: message.key, progress: currentProgress })); if (currentProgress.status === 'finished' && currentProgress.mp4Url) { ws.send(JSON.stringify({ type: 'ready', key: message.key, mp4Url: currentProgress.mp4Url })); } } } } catch (error) { console.error('WebSocket parse error:', error); } }); ws.on('close', () => removeWsClient(ws)); }); // Endpoint to list videos in the bucket app.get('/api/videos', async (req, res) => { try { if (!BUCKET_NAME) { return res.status(500).json({ error: 'S3_BUCKET_NAME not configured' }); } const allObjects = []; let continuationToken; do { const command = new ListObjectsV2Command({ Bucket: BUCKET_NAME, ContinuationToken: continuationToken, }); const response = await s3Client.send(command); allObjects.push(...(response.Contents || [])); continuationToken = response.IsTruncated ? response.NextContinuationToken : undefined; } while (continuationToken); // Filter for a broader set of common video formats const videoExtensions = [ '.3gp', '.3g2', '.asf', '.avi', '.divx', '.flv', '.m2ts', '.m2v', '.m4v', '.mkv', '.mov', '.mp4', '.mpeg', '.mpg', '.mts', '.mxf', '.ogm', '.ogv', '.qt', '.rm', '.rmvb', '.ts', '.vob', '.vro', '.webm', '.wmv' ]; const videos = (response.Contents || []) .map(item => item.Key) .filter(key => { if (!key) return false; const lowerKey = key.toLowerCase(); return videoExtensions.some(ext => lowerKey.endsWith(ext)); }); res.json({ videos }); } catch (error) { console.error('Error fetching videos:', error); res.status(500).json({ error: 'Failed to fetch videos from S3', detail: error.message }); } }); // Endpoint to transcode S3 video streaming to MP4 app.post('/api/transcode', async (req, res) => { const { key, codec, encoder } = req.body; if (!key) { return res.status(400).json({ error: 'Video key is required' }); } const safeCodec = codec === 'h265' ? 'h265' : 'h264'; const safeEncoder = ['nvidia', 'intel'].includes(encoder) ? encoder : 'software'; const codecMap = { software: { h264: 'libx264', h265: 'libx265' }, nvidia: { h264: 'h264_nvenc', h265: 'hevc_nvenc' }, intel: { h264: 'h264_qsv', h265: 'hevc_qsv' } }; const videoCodec = codecMap[safeEncoder][safeCodec]; try { const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')); const progressKey = safeKeySegments.join('/'); const outputDir = path.join(__dirname, 'public', 'mp4', ...safeKeySegments); const mp4Path = path.join(outputDir, 'video.mp4'); const mp4Url = `/mp4/${progressKey}/video.mp4`; progressMap[progressKey] = { status: 'pending', percent: 0, details: 'Waiting for ffmpeg to start', mp4Url }; // If it already exists, just return the URL if (fs.existsSync(mp4Path)) { return res.json({ message: 'Already transcoded', mp4Url }); } // Create output directory if it doesn't exist fs.mkdirSync(outputDir, { recursive: true }); // Get S3 stream const command = new GetObjectCommand({ Bucket: BUCKET_NAME, Key: key }); const response = await s3Client.send(command); const s3Stream = response.Body; // Triggers fluent-ffmpeg to transcode to MP4 console.log(`Starting transcoding for ${key} with codec ${videoCodec}`); ffmpeg(s3Stream) .videoCodec(videoCodec) .audioCodec('aac') .outputOptions([ '-preset fast', '-crf 23' ]) .format('mp4') .output(mp4Path) .on('progress', (progress) => { const progressState = { status: 'transcoding', percent: Math.min(Math.max(Math.round(progress.percent || 0), 0), 100), frame: progress.frames || null, fps: progress.currentFps || null, bitrate: progress.currentKbps || null, timemark: progress.timemark || null, details: `Transcoding... ${Math.min(Math.max(Math.round(progress.percent || 0), 0), 100)}%`, mp4Url }; progressMap[progressKey] = progressState; broadcastWs(progressKey, { type: 'progress', key: progressKey, progress: progressState }); }) .on('end', () => { console.log(`Finished transcoding ${key} to MP4`); const progressState = { status: 'finished', percent: 100, details: 'Transcoding complete', mp4Url }; progressMap[progressKey] = progressState; broadcastWs(progressKey, { type: 'progress', key: progressKey, progress: progressState }); broadcastWs(progressKey, { type: 'ready', key: progressKey, mp4Url }); }) .on('error', (err) => { console.error(`Error transcoding ${key}:`, err); const failedState = { status: 'failed', percent: progressMap[progressKey]?.percent || 0, details: err.message || 'Transcoding failed', mp4Url }; progressMap[progressKey] = failedState; broadcastWs(progressKey, { type: 'progress', key: progressKey, progress: failedState }); }) .run(); // Return immediately so the client can start polling or waiting res.json({ message: 'Transcoding started', mp4Url }); } catch (error) { console.error('Error in transcode:', error); res.status(500).json({ error: 'Failed to initiate transcoding', detail: error.message }); } }); // Status check for MP4 availability app.get('/api/status', (req, res) => { const { key } = req.query; if (!key) return res.status(400).json({ error: 'Key is required' }); const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')); const progressKey = safeKeySegments.join('/'); const mp4Path = path.join(__dirname, 'public', 'mp4', ...safeKeySegments, 'video.mp4'); const progress = progressMap[progressKey] || null; // Check if the MP4 file exists if (fs.existsSync(mp4Path)) { res.json({ ready: true, mp4Url: `/mp4/${safeKeySegments.join('/')}/video.mp4`, progress }); } else { res.json({ ready: false, progress }); } }); server.listen(PORT, HOST, () => { console.log(`Server running on http://${HOST}:${PORT}`); });