435 lines
17 KiB
JavaScript
435 lines
17 KiB
JavaScript
const express = require('express');
|
|
const cors = require('cors');
|
|
const dotenv = require('dotenv');
|
|
const fs = require('fs');
|
|
const os = require('os');
|
|
const path = require('path');
|
|
const http = require('http');
|
|
const WebSocket = require('ws');
|
|
const ffmpeg = require('fluent-ffmpeg');
|
|
const { S3Client, ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3');
|
|
|
|
dotenv.config();
|
|
|
|
const app = express();
|
|
const PORT = process.env.PORT || 3000;
|
|
const HOST = process.env.HOST || process.env.LISTEN_ADDRESS || '0.0.0.0';
|
|
const server = http.createServer(app);
|
|
|
|
app.use(cors());
|
|
app.use(express.json());
|
|
app.use(express.static('public'));
|
|
|
|
const rawBucketAddress = process.env.S3_BUCKET_ADDRESS || process.env.S3_BUCKET_NAME || '';
|
|
let BUCKET_NAME = rawBucketAddress;
|
|
|
|
const parsedBucketUrl = rawBucketAddress.includes('://') ? (() => {
|
|
try {
|
|
const parsed = new URL(rawBucketAddress);
|
|
const name = parsed.pathname.replace(/^\/+/, '');
|
|
if (name) {
|
|
BUCKET_NAME = name;
|
|
}
|
|
return parsed.origin;
|
|
} catch (err) {
|
|
return undefined;
|
|
}
|
|
})() : undefined;
|
|
|
|
const defaultS3ClientConfig = {
|
|
region: process.env.AWS_REGION || 'us-east-1',
|
|
endpoint: process.env.S3_ENDPOINT || parsedBucketUrl,
|
|
forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true'
|
|
};
|
|
|
|
const createS3Client = (credentials) => {
|
|
const clientConfig = { ...defaultS3ClientConfig };
|
|
if (credentials && credentials.username && credentials.password) {
|
|
clientConfig.credentials = {
|
|
accessKeyId: credentials.username,
|
|
secretAccessKey: credentials.password
|
|
};
|
|
} else if (process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY) {
|
|
clientConfig.credentials = {
|
|
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
|
|
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
|
|
};
|
|
}
|
|
return new S3Client(clientConfig);
|
|
};
|
|
|
|
const progressMap = {};
|
|
const wsSubscriptions = new Map();
|
|
|
|
const getProgressKey = (key) => key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')).join('/');
|
|
|
|
const addWsClient = (progressKey, ws) => {
|
|
if (!wsSubscriptions.has(progressKey)) {
|
|
wsSubscriptions.set(progressKey, new Set());
|
|
}
|
|
wsSubscriptions.get(progressKey).add(ws);
|
|
ws.currentKey = progressKey;
|
|
};
|
|
|
|
const removeWsClient = (ws) => {
|
|
const key = ws.currentKey;
|
|
if (!key) return;
|
|
const set = wsSubscriptions.get(key);
|
|
if (!set) return;
|
|
set.delete(ws);
|
|
if (set.size === 0) wsSubscriptions.delete(key);
|
|
};
|
|
|
|
const broadcastWs = (key, payload) => {
|
|
const clients = wsSubscriptions.get(key);
|
|
if (!clients) return;
|
|
const message = JSON.stringify(payload);
|
|
for (const client of clients) {
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
client.send(message);
|
|
}
|
|
}
|
|
};
|
|
|
|
const createFfmpegOptions = (encoderName) => {
|
|
const options = ['-preset fast'];
|
|
if (encoderName === 'libx264' || encoderName === 'libx265') {
|
|
options.push('-crf', '23');
|
|
} else if (/_nvenc$/.test(encoderName)) {
|
|
options.push('-rc:v', 'vbr_hq', '-cq', '19');
|
|
} else if (/_qsv$/.test(encoderName)) {
|
|
options.push('-global_quality', '23');
|
|
} else if (/_vaapi$/.test(encoderName)) {
|
|
options.push('-qp', '23');
|
|
}
|
|
return options;
|
|
};
|
|
|
|
const shouldRetryWithSoftware = (message) => {
|
|
if (!message) return false;
|
|
return /Cannot load libcuda\.so\.1|Could not open encoder before EOF|Error while opening encoder|Operation not permitted|Invalid argument/i.test(message);
|
|
};
|
|
|
|
const extractS3Credentials = (req) => {
|
|
const username = req.headers['x-s3-username'] || req.body?.username || '';
|
|
const password = req.headers['x-s3-password'] || req.body?.password || '';
|
|
return {
|
|
username: typeof username === 'string' ? username.trim() : '',
|
|
password: typeof password === 'string' ? password : ''
|
|
};
|
|
};
|
|
|
|
const wss = new WebSocket.Server({ server });
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.on('message', (raw) => {
|
|
try {
|
|
const message = JSON.parse(raw.toString());
|
|
if (message.type === 'subscribe' && typeof message.key === 'string') {
|
|
const progressKey = getProgressKey(message.key);
|
|
if (ws.currentKey && ws.currentKey !== progressKey) {
|
|
removeWsClient(ws);
|
|
}
|
|
addWsClient(progressKey, ws);
|
|
|
|
const currentProgress = progressMap[progressKey];
|
|
if (currentProgress) {
|
|
ws.send(JSON.stringify({ type: 'progress', key: message.key, progress: currentProgress }));
|
|
if (currentProgress.status === 'finished' && currentProgress.mp4Url) {
|
|
ws.send(JSON.stringify({ type: 'ready', key: message.key, mp4Url: currentProgress.mp4Url }));
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error('WebSocket parse error:', error);
|
|
}
|
|
});
|
|
|
|
ws.on('close', () => removeWsClient(ws));
|
|
});
|
|
|
|
const clearMp4Cache = () => {
|
|
const mp4Dir = path.join(__dirname, 'public', 'mp4');
|
|
if (!fs.existsSync(mp4Dir)) return;
|
|
try {
|
|
fs.rmSync(mp4Dir, { recursive: true, force: true });
|
|
} catch (err) {
|
|
if (typeof fs.rmdirSync === 'function') {
|
|
fs.rmdirSync(mp4Dir, { recursive: true });
|
|
} else {
|
|
throw err;
|
|
}
|
|
}
|
|
};
|
|
|
|
// Endpoint to list videos in the bucket
|
|
app.get('/api/videos', async (req, res) => {
|
|
try {
|
|
if (!BUCKET_NAME) {
|
|
return res.status(500).json({ error: 'S3_BUCKET_NAME not configured' });
|
|
}
|
|
const allObjects = [];
|
|
const auth = extractS3Credentials(req);
|
|
const s3Client = createS3Client(auth);
|
|
let continuationToken;
|
|
|
|
do {
|
|
const command = new ListObjectsV2Command({
|
|
Bucket: BUCKET_NAME,
|
|
ContinuationToken: continuationToken,
|
|
});
|
|
const response = await s3Client.send(command);
|
|
allObjects.push(...(response.Contents || []));
|
|
continuationToken = response.IsTruncated ? response.NextContinuationToken : undefined;
|
|
} while (continuationToken);
|
|
|
|
// Filter for a broader set of common video formats
|
|
const videoExtensions = [
|
|
'.3gp', '.3g2', '.asf', '.avi', '.divx', '.flv', '.m2ts', '.m2v',
|
|
'.m4v', '.mkv', '.mov', '.mp4', '.mpeg', '.mpg', '.mts', '.mxf',
|
|
'.ogm', '.ogv', '.qt', '.rm', '.rmvb', '.ts', '.vob', '.vro',
|
|
'.webm', '.wmv'
|
|
];
|
|
const videos = allObjects
|
|
.map(item => item.Key)
|
|
.filter(key => {
|
|
if (!key) return false;
|
|
const lowerKey = key.toLowerCase();
|
|
return videoExtensions.some(ext => lowerKey.endsWith(ext));
|
|
});
|
|
|
|
res.json({ videos });
|
|
} catch (error) {
|
|
console.error('Error fetching videos:', error);
|
|
res.status(500).json({ error: 'Failed to fetch videos from S3', detail: error.message });
|
|
}
|
|
});
|
|
|
|
app.get('/api/config', (req, res) => {
|
|
const title = process.env.APP_TITLE || 'S3 Media Transcoder';
|
|
res.json({ title });
|
|
});
|
|
|
|
app.post('/api/reset-cache', (req, res) => {
|
|
try {
|
|
clearMp4Cache();
|
|
Object.keys(progressMap).forEach((key) => delete progressMap[key]);
|
|
res.json({ message: 'Cache reset' });
|
|
} catch (error) {
|
|
console.error('Error resetting cache:', error);
|
|
res.status(500).json({ error: 'Failed to reset cache', detail: error.message });
|
|
}
|
|
});
|
|
|
|
// Endpoint to transcode S3 video streaming to MP4
|
|
app.post('/api/transcode', async (req, res) => {
|
|
const { key, codec, encoder } = req.body;
|
|
|
|
if (!key) {
|
|
return res.status(400).json({ error: 'Video key is required' });
|
|
}
|
|
|
|
const safeCodec = codec === 'h265' ? 'h265' : 'h264';
|
|
const safeEncoder = ['nvidia', 'intel', 'vaapi'].includes(encoder) ? encoder : 'software';
|
|
const codecMap = {
|
|
software: { h264: 'libx264', h265: 'libx265' },
|
|
nvidia: { h264: 'h264_nvenc', h265: 'hevc_nvenc' },
|
|
intel: { h264: 'h264_qsv', h265: 'hevc_qsv' },
|
|
vaapi: { h264: 'h264_vaapi', h265: 'hevc_vaapi' }
|
|
};
|
|
const videoCodec = codecMap[safeEncoder][safeCodec];
|
|
|
|
try {
|
|
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_'));
|
|
const progressKey = safeKeySegments.join('/');
|
|
const outputDir = path.join(__dirname, 'public', 'mp4', ...safeKeySegments);
|
|
const mp4Path = path.join(outputDir, 'video.mp4');
|
|
const mp4Url = `/mp4/${progressKey}/video.mp4`;
|
|
|
|
progressMap[progressKey] = { status: 'pending', percent: 0, details: 'Waiting for ffmpeg to start', mp4Url };
|
|
|
|
// If it already exists, just return the URL
|
|
if (fs.existsSync(mp4Path)) {
|
|
return res.json({ message: 'Already transcoded', mp4Url });
|
|
}
|
|
|
|
// Create output directory if it doesn't exist
|
|
fs.mkdirSync(outputDir, { recursive: true });
|
|
|
|
// Get S3 stream
|
|
const auth = extractS3Credentials(req);
|
|
const s3Client = createS3Client(auth);
|
|
const command = new GetObjectCommand({
|
|
Bucket: BUCKET_NAME,
|
|
Key: key
|
|
});
|
|
|
|
const response = await s3Client.send(command);
|
|
const s3Stream = response.Body;
|
|
const totalBytes = response.ContentLength || 0;
|
|
let downloadedBytes = 0;
|
|
const tmpInputPath = path.join(os.tmpdir(), `s3-input-${Date.now()}-${Math.random().toString(16).slice(2)}.tmp`);
|
|
|
|
const broadcastDownloadProgress = () => {
|
|
const percent = totalBytes ? Math.min(100, Math.round(downloadedBytes / totalBytes * 100)) : 0;
|
|
const downloadState = {
|
|
status: 'downloading',
|
|
percent,
|
|
downloadedBytes,
|
|
totalBytes,
|
|
details: totalBytes ? `Downloading ${percent}%` : 'Downloading...',
|
|
mp4Url
|
|
};
|
|
progressMap[progressKey] = downloadState;
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: downloadState });
|
|
};
|
|
|
|
await new Promise((resolve, reject) => {
|
|
const writeStream = fs.createWriteStream(tmpInputPath);
|
|
s3Stream.on('data', (chunk) => {
|
|
downloadedBytes += chunk.length;
|
|
broadcastDownloadProgress();
|
|
});
|
|
s3Stream.on('error', (err) => {
|
|
reject(err);
|
|
});
|
|
writeStream.on('error', reject);
|
|
writeStream.on('finish', resolve);
|
|
s3Stream.pipe(writeStream);
|
|
});
|
|
|
|
broadcastDownloadProgress();
|
|
progressMap[progressKey] = {
|
|
status: 'downloaded',
|
|
percent: 100,
|
|
downloadedBytes,
|
|
totalBytes,
|
|
details: 'Download complete, starting transcode...',
|
|
mp4Url
|
|
};
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
|
|
|
// Triggers fluent-ffmpeg to transcode to MP4
|
|
console.log(`Starting transcoding for ${key} with codec ${videoCodec}`);
|
|
|
|
const cleanupTmpInput = () => {
|
|
fs.unlink(tmpInputPath, () => {});
|
|
};
|
|
|
|
let attemptedSoftwareFallback = false;
|
|
const startFfmpeg = (encoderName) => {
|
|
console.log(`Starting ffmpeg with encoder ${encoderName} for ${key}`);
|
|
const command = ffmpeg(tmpInputPath)
|
|
.videoCodec(encoderName)
|
|
.audioCodec('aac')
|
|
.outputOptions(createFfmpegOptions(encoderName));
|
|
if (/_vaapi$/.test(encoderName)) {
|
|
command
|
|
.inputOptions(['-vaapi_device', '/dev/dri/renderD128'])
|
|
.videoFilters('format=nv12,hwupload');
|
|
}
|
|
command
|
|
.format('mp4')
|
|
.output(mp4Path)
|
|
.on('progress', (progress) => {
|
|
const progressState = {
|
|
status: 'transcoding',
|
|
percent: Math.min(Math.max(Math.round(progress.percent || 0), 0), 100),
|
|
frame: progress.frames || null,
|
|
fps: progress.currentFps || null,
|
|
bitrate: progress.currentKbps || null,
|
|
timemark: progress.timemark || null,
|
|
details: `Transcoding... ${Math.min(Math.max(Math.round(progress.percent || 0), 0), 100)}%`,
|
|
mp4Url
|
|
};
|
|
progressMap[progressKey] = progressState;
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressState });
|
|
})
|
|
.on('stderr', (stderrLine) => {
|
|
console.log(`ffmpeg stderr: ${stderrLine}`);
|
|
})
|
|
.on('end', () => {
|
|
cleanupTmpInput();
|
|
console.log(`Finished transcoding ${key} to MP4`);
|
|
let progressState;
|
|
try {
|
|
const stats = fs.statSync(mp4Path);
|
|
if (!stats.isFile() || stats.size === 0) {
|
|
throw new Error('Output MP4 is empty or missing');
|
|
}
|
|
progressState = { status: 'finished', percent: 100, details: 'Transcoding complete', mp4Url };
|
|
} catch (verifyError) {
|
|
console.error(`Output verification failed for ${mp4Path}:`, verifyError);
|
|
progressState = { status: 'failed', percent: progressMap[progressKey]?.percent || 0, details: `Output verification failed: ${verifyError.message}`, mp4Url };
|
|
}
|
|
progressMap[progressKey] = progressState;
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressState });
|
|
if (progressState.status === 'finished') {
|
|
broadcastWs(progressKey, { type: 'ready', key, mp4Url });
|
|
}
|
|
})
|
|
.on('error', (err) => {
|
|
const errMessage = err?.message || '';
|
|
const isHardwareFailure = !attemptedSoftwareFallback && encoderName !== codecMap.software[safeCodec] && shouldRetryWithSoftware(errMessage);
|
|
if (isHardwareFailure) {
|
|
attemptedSoftwareFallback = true;
|
|
console.warn(`Hardware encoder failed for ${key}; retrying with software encoder`, errMessage);
|
|
try {
|
|
if (fs.existsSync(mp4Path)) {
|
|
fs.unlinkSync(mp4Path);
|
|
}
|
|
} catch (_) {}
|
|
const softwareEncoder = codecMap.software[safeCodec];
|
|
progressMap[progressKey] = {
|
|
status: 'fallback',
|
|
percent: 0,
|
|
details: 'Hardware encoder unavailable, retrying with software encoder...',
|
|
mp4Url
|
|
};
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
|
startFfmpeg(softwareEncoder);
|
|
return;
|
|
}
|
|
|
|
cleanupTmpInput();
|
|
console.error(`Error transcoding ${key}:`, err);
|
|
const failedState = { status: 'failed', percent: progressMap[progressKey]?.percent || 0, details: err.message || 'Transcoding failed', mp4Url };
|
|
progressMap[progressKey] = failedState;
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: failedState });
|
|
})
|
|
.run();
|
|
};
|
|
|
|
startFfmpeg(videoCodec);
|
|
|
|
// Return immediately so the client can start polling or waiting
|
|
res.json({ message: 'Transcoding started', mp4Url });
|
|
|
|
} catch (error) {
|
|
console.error('Error in transcode:', error);
|
|
res.status(500).json({ error: 'Failed to initiate transcoding', detail: error.message });
|
|
}
|
|
});
|
|
|
|
// Status check for MP4 availability
|
|
app.get('/api/status', (req, res) => {
|
|
const { key } = req.query;
|
|
if (!key) return res.status(400).json({ error: 'Key is required' });
|
|
|
|
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_'));
|
|
const progressKey = safeKeySegments.join('/');
|
|
const mp4Path = path.join(__dirname, 'public', 'mp4', ...safeKeySegments, 'video.mp4');
|
|
const progress = progressMap[progressKey] || null;
|
|
|
|
const outputReady = fs.existsSync(mp4Path) && (!progress || progress.status !== 'failed');
|
|
if (outputReady) {
|
|
res.json({ ready: true, mp4Url: `/mp4/${safeKeySegments.join('/')}/video.mp4`, progress });
|
|
} else {
|
|
res.json({ ready: false, progress });
|
|
}
|
|
});
|
|
|
|
server.listen(PORT, HOST, () => {
|
|
console.log(`Server running on http://${HOST}:${PORT}`);
|
|
});
|