const express = require('express'); const cors = require('cors'); const dotenv = require('dotenv'); const fs = require('fs'); const os = require('os'); const path = require('path'); const http = require('http'); const WebSocket = require('ws'); const ffmpeg = require('fluent-ffmpeg'); const { S3Client, ListBucketsCommand, ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3'); dotenv.config(); const app = express(); const PORT = process.env.PORT || 3000; const HOST = process.env.HOST || process.env.LISTEN_ADDRESS || '0.0.0.0'; const server = http.createServer(app); app.use(cors()); app.use(express.json()); app.use(express.static('public')); const rawBucketAddress = process.env.S3_BUCKET_ADDRESS || process.env.S3_BUCKET_NAME || ''; let BUCKET_NAME = rawBucketAddress; const parsedBucketUrl = rawBucketAddress.includes('://') ? (() => { try { const parsed = new URL(rawBucketAddress); const name = parsed.pathname.replace(/^\/+/, ''); if (name) { BUCKET_NAME = name; } return parsed.origin; } catch (err) { return undefined; } })() : undefined; const defaultS3ClientConfig = { region: process.env.AWS_REGION || 'us-east-1', endpoint: process.env.S3_ENDPOINT || parsedBucketUrl, forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true' }; const createS3Client = (credentials) => { const clientConfig = { ...defaultS3ClientConfig }; if (credentials && credentials.username && credentials.password) { clientConfig.credentials = { accessKeyId: credentials.username, secretAccessKey: credentials.password }; } else if (process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY) { clientConfig.credentials = { accessKeyId: process.env.AWS_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY }; } return new S3Client(clientConfig); }; const progressMap = {}; const transcodeProcesses = new Map(); const wsSubscriptions = new Map(); const getProgressKey = (key) => key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')).join('/'); const addWsClient = (progressKey, ws) => { if (!wsSubscriptions.has(progressKey)) { wsSubscriptions.set(progressKey, new Set()); } wsSubscriptions.get(progressKey).add(ws); ws.currentKey = progressKey; }; const removeWsClient = (ws) => { const key = ws.currentKey; if (!key) return; const set = wsSubscriptions.get(key); if (!set) return; set.delete(ws); if (set.size === 0) wsSubscriptions.delete(key); }; const broadcastWs = (key, payload) => { const clients = wsSubscriptions.get(key); if (!clients) return; const message = JSON.stringify(payload); for (const client of clients) { if (client.readyState === WebSocket.OPEN) { client.send(message); } } }; const createFfmpegOptions = (encoderName) => { const options = ['-preset fast']; if (encoderName === 'libx264' || encoderName === 'libx265') { options.push('-crf', '23', '-threads', '0'); } else if (/_nvenc$/.test(encoderName)) { options.push('-rc:v', 'vbr_hq', '-cq', '19'); } else if (/_qsv$/.test(encoderName)) { options.push('-global_quality', '23'); } else if (/_vaapi$/.test(encoderName)) { options.push('-qp', '23'); } return options; }; const shouldRetryWithSoftware = (message) => { if (!message) return false; return /Cannot load libcuda\.so\.1|Could not open encoder before EOF|Error while opening encoder|Operation not permitted|Invalid argument/i.test(message); }; const extractS3Credentials = (req) => { const username = req.headers['x-s3-username'] || req.body?.username || ''; const password = req.headers['x-s3-password'] || req.body?.password || ''; return { username: typeof username === 'string' ? username.trim() : '', password: typeof password === 'string' ? password : '' }; }; const wss = new WebSocket.Server({ server }); wss.on('connection', (ws) => { ws.on('message', (raw) => { try { const message = JSON.parse(raw.toString()); if (message.type === 'subscribe' && typeof message.key === 'string') { const progressKey = getProgressKey(message.key); if (ws.currentKey && ws.currentKey !== progressKey) { removeWsClient(ws); } addWsClient(progressKey, ws); const currentProgress = progressMap[progressKey]; if (currentProgress) { ws.send(JSON.stringify({ type: 'progress', key: message.key, progress: currentProgress })); if (currentProgress.status === 'finished' && currentProgress.mp4Url) { ws.send(JSON.stringify({ type: 'ready', key: message.key, mp4Url: currentProgress.mp4Url })); } } } } catch (error) { console.error('WebSocket parse error:', error); } }); ws.on('close', () => removeWsClient(ws)); }); const clearDownloadCache = () => { const tmpDir = os.tmpdir(); try { if (!fs.existsSync(tmpDir)) return; const files = fs.readdirSync(tmpDir); for (const file of files) { if (file.startsWith('s3-input-') && file.endsWith('.tmp')) { const filePath = path.join(tmpDir, file); fs.rmSync(filePath, { force: true }); } } } catch (err) { console.error('Failed to clear download cache:', err); throw err; } }; // Endpoint to list available buckets app.get('/api/buckets', async (req, res) => { try { const auth = extractS3Credentials(req); const s3Client = createS3Client(auth); const command = new ListBucketsCommand({}); const response = await s3Client.send(command); const buckets = response.Buckets || []; res.json({ buckets }); } catch (error) { console.error('Error listing buckets:', error); res.status(500).json({ error: 'Failed to list buckets', detail: error.message }); } }); // Endpoint to list videos in the bucket app.get('/api/videos', async (req, res) => { try { const bucket = req.query.bucket || BUCKET_NAME; if (!bucket) { return res.status(400).json({ error: 'Bucket name is required' }); } const allObjects = []; const auth = extractS3Credentials(req); const s3Client = createS3Client(auth); let continuationToken; do { const command = new ListObjectsV2Command({ Bucket: bucket, ContinuationToken: continuationToken, }); const response = await s3Client.send(command); allObjects.push(...(response.Contents || [])); continuationToken = response.IsTruncated ? response.NextContinuationToken : undefined; } while (continuationToken); // Filter for a broader set of common video formats const videoExtensions = [ '.3gp', '.3g2', '.asf', '.avi', '.divx', '.flv', '.m2ts', '.m2v', '.m4v', '.mkv', '.mov', '.mp4', '.mpeg', '.mpg', '.mts', '.mxf', '.ogm', '.ogv', '.qt', '.rm', '.rmvb', '.ts', '.vob', '.vro', '.webm', '.wmv' ]; const videos = allObjects .map(item => item.Key) .filter(key => { if (!key) return false; const lowerKey = key.toLowerCase(); return videoExtensions.some(ext => lowerKey.endsWith(ext)); }); res.json({ videos }); } catch (error) { console.error('Error fetching videos:', error); res.status(500).json({ error: 'Failed to fetch videos from S3', detail: error.message }); } }); app.get('/api/config', (req, res) => { const title = process.env.APP_TITLE || 'S3 Media Transcoder'; res.json({ title }); }); app.post('/api/clear-download-cache', (req, res) => { try { clearDownloadCache(); res.json({ message: 'Download cache cleared' }); } catch (error) { console.error('Error clearing download cache:', error); res.status(500).json({ error: 'Failed to clear download cache', detail: error.message }); } }); app.post('/api/stop-transcode', (req, res) => { try { const { key } = req.body; if (!key) { return res.status(400).json({ error: 'Video key is required' }); } const progressKey = getProgressKey(key); const command = transcodeProcesses.get(progressKey); if (!command) { return res.status(404).json({ error: 'No active transcode found for this key' }); } try { if (typeof command.kill === 'function') { command.kill('SIGKILL'); } } catch (killError) { console.warn('Failed to kill transcode process:', killError); } transcodeProcesses.delete(progressKey); progressMap[progressKey] = { status: 'cancelled', percent: 0, details: 'Transcode stopped by user', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); res.json({ message: 'Transcode stopped' }); } catch (error) { console.error('Error stopping transcode:', error); res.status(500).json({ error: 'Failed to stop transcode', detail: error.message }); } }); app.get('/api/stream', async (req, res) => { const bucket = req.query.bucket; const key = req.query.key; const codec = req.query.codec; const encoder = req.query.encoder; if (!bucket) { return res.status(400).json({ error: 'Bucket name is required' }); } if (!key) { return res.status(400).json({ error: 'Video key is required' }); } const safeCodec = codec === 'h265' ? 'h265' : 'h264'; const safeEncoder = ['nvidia', 'intel', 'vaapi', 'neon'].includes(encoder) ? encoder : 'software'; const codecMap = { software: { h264: 'libx264', h265: 'libx265' }, neon: { h264: 'libx264', h265: 'libx265' }, nvidia: { h264: 'h264_nvenc', h265: 'hevc_nvenc' }, intel: { h264: 'h264_qsv', h265: 'hevc_qsv' }, vaapi: { h264: 'h264_vaapi', h265: 'hevc_vaapi' } }; const videoCodec = codecMap[safeEncoder][safeCodec]; const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')); const progressKey = safeKeySegments.join('/'); const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_'); const tmpInputPath = path.join(os.tmpdir(), `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`); const cacheExists = fs.existsSync(tmpInputPath); const auth = extractS3Credentials(req); const s3Client = createS3Client(auth); try { let totalBytes = 0; let downloadedBytes = 0; if (!cacheExists) { const command = new GetObjectCommand({ Bucket: bucket, Key: key }); const response = await s3Client.send(command); const s3Stream = response.Body; totalBytes = response.ContentLength || 0; progressMap[progressKey] = { status: 'downloading', percent: 0, downloadedBytes: 0, totalBytes, details: 'Downloading full source before streaming...', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); await new Promise((resolve, reject) => { const writeStream = fs.createWriteStream(tmpInputPath); s3Stream.on('data', (chunk) => { downloadedBytes += chunk.length; const percent = totalBytes ? Math.min(100, Math.round(downloadedBytes / totalBytes * 100)) : 0; const downloadState = { status: 'downloading', percent, downloadedBytes, totalBytes, details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...', mp4Url: null }; progressMap[progressKey] = downloadState; broadcastWs(progressKey, { type: 'progress', key, progress: downloadState }); }); s3Stream.on('error', reject); writeStream.on('error', reject); writeStream.on('finish', resolve); s3Stream.pipe(writeStream); }); progressMap[progressKey] = { status: 'downloaded', percent: 100, downloadedBytes, totalBytes, details: 'Source download complete, starting real-time transcode...', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); } else { const stats = fs.statSync(tmpInputPath); totalBytes = stats.size; downloadedBytes = totalBytes; progressMap[progressKey] = { status: 'downloaded', percent: 100, downloadedBytes, totalBytes, details: 'Source already downloaded locally, starting real-time transcode...', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); } res.setHeader('Content-Type', 'video/mp4'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); let ffmpegCommand = null; const startStream = (encoderName) => { const streamingOptions = createFfmpegOptions(encoderName).concat(['-movflags', 'frag_keyframe+empty_moov+faststart']); ffmpegCommand = ffmpeg(tmpInputPath) .videoCodec(encoderName) .audioCodec('aac') .outputOptions(streamingOptions) .format('mp4'); if (/_vaapi$/.test(encoderName)) { ffmpegCommand .inputOptions(['-vaapi_device', '/dev/dri/renderD128']) .videoFilters('format=nv12,hwupload'); } transcodeProcesses.set(progressKey, ffmpegCommand); ffmpegCommand .on('progress', (progress) => { const progressState = { status: 'transcoding', percent: Math.min(Math.max(Math.round(progress.percent || 0), 0), 100), frame: progress.frames || null, fps: progress.currentFps || null, bitrate: progress.currentKbps || null, timemark: progress.timemark || null, details: `Streaming transcode ${Math.min(Math.max(Math.round(progress.percent || 0), 0), 100)}%`, mp4Url: null }; progressMap[progressKey] = progressState; broadcastWs(progressKey, { type: 'progress', key, progress: progressState }); }) .on('stderr', (stderrLine) => { console.log(`ffmpeg stderr: ${stderrLine}`); }) .on('end', () => { transcodeProcesses.delete(progressKey); progressMap[progressKey] = { status: 'finished', percent: 100, details: 'Streaming transcode complete', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); }) .on('error', (err) => { transcodeProcesses.delete(progressKey); const failedState = { status: 'failed', percent: progressMap[progressKey]?.percent || 0, details: err.message || 'Streaming transcode failed', mp4Url: null }; progressMap[progressKey] = failedState; broadcastWs(progressKey, { type: 'progress', key, progress: failedState }); if (!res.headersSent) { res.status(500).json({ error: 'Failed to stream transcoded video', detail: err.message }); } else { res.destroy(err); } }); ffmpegCommand.pipe(res, { end: true }); }; res.on('close', () => { if (ffmpegCommand && typeof ffmpegCommand.kill === 'function') { try { ffmpegCommand.kill('SIGKILL'); } catch (_) {} } }); startStream(videoCodec); } catch (error) { console.error('Error in stream:', error); if (!res.headersSent) { res.status(500).json({ error: 'Failed to stream video', detail: error.message }); } else { res.destroy(error); } } }); server.listen(PORT, HOST, () => { console.log(`Server running on http://${HOST}:${PORT}`); });