1081 lines
42 KiB
JavaScript
1081 lines
42 KiB
JavaScript
const express = require('express');
|
|
const cors = require('cors');
|
|
const dotenv = require('dotenv');
|
|
const fs = require('fs');
|
|
const path = require('path');
|
|
const http = require('http');
|
|
const crypto = require('crypto');
|
|
const { PassThrough } = require('stream');
|
|
const WebSocket = require('ws');
|
|
const ffmpeg = require('fluent-ffmpeg');
|
|
const { S3Client, ListBucketsCommand, ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3');
|
|
|
|
dotenv.config();
|
|
|
|
const app = express();
|
|
const PORT = process.env.PORT || 3000;
|
|
const HOST = process.env.HOST || process.env.LISTEN_ADDRESS || '0.0.0.0';
|
|
const server = http.createServer(app);
|
|
const JELLYFIN_FFMPEG_PATH = process.env.JELLYFIN_FFMPEG_PATH || '/usr/lib/jellyfin-ffmpeg/ffmpeg';
|
|
const JELLYFIN_FFPROBE_PATH = process.env.JELLYFIN_FFPROBE_PATH || '/usr/lib/jellyfin-ffmpeg/ffprobe';
|
|
|
|
if (typeof ffmpeg.setFfmpegPath === 'function') {
|
|
ffmpeg.setFfmpegPath(JELLYFIN_FFMPEG_PATH);
|
|
}
|
|
if (typeof ffmpeg.setFfprobePath === 'function') {
|
|
ffmpeg.setFfprobePath(JELLYFIN_FFPROBE_PATH);
|
|
}
|
|
|
|
app.use(cors());
|
|
app.use(express.json());
|
|
app.use(express.static('public'));
|
|
|
|
const rawBucketAddress = process.env.S3_BUCKET_ADDRESS || process.env.S3_BUCKET_NAME || '';
|
|
let BUCKET_NAME = rawBucketAddress;
|
|
|
|
const parsedBucketUrl = rawBucketAddress.includes('://') ? (() => {
|
|
try {
|
|
const parsed = new URL(rawBucketAddress);
|
|
const name = parsed.pathname.replace(/^\/+/, '');
|
|
if (name) {
|
|
BUCKET_NAME = name;
|
|
}
|
|
return parsed.origin;
|
|
} catch (err) {
|
|
return undefined;
|
|
}
|
|
})() : undefined;
|
|
|
|
const defaultS3ClientConfig = {
|
|
region: process.env.AWS_REGION || 'us-east-1',
|
|
endpoint: process.env.S3_ENDPOINT || parsedBucketUrl,
|
|
forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true'
|
|
};
|
|
const VALKEY_URL = process.env.VALKEY_URL || process.env.REDIS_URL || process.env.VALKEY_ADDRESS || '';
|
|
const SESSION_COOKIE_NAME = process.env.S3_SESSION_COOKIE_NAME || 'media_coding_s3_session';
|
|
const SESSION_TTL_SECONDS = Math.max(300, parseInt(process.env.S3_SESSION_TTL_SECONDS || '2592000', 10) || 2592000);
|
|
const isSecureCookie = process.env.SESSION_COOKIE_SECURE === 'true' || process.env.NODE_ENV === 'production';
|
|
|
|
const PROJECT_ROOT = __dirname;
|
|
const DOWNLOADS_DIR = path.join(PROJECT_ROOT, 'Downloads');
|
|
const CONVERT_DIR = path.join(PROJECT_ROOT, 'convert');
|
|
|
|
const ensureDirectory = (dirPath) => {
|
|
if (!fs.existsSync(dirPath)) {
|
|
fs.mkdirSync(dirPath, { recursive: true });
|
|
}
|
|
};
|
|
|
|
ensureDirectory(DOWNLOADS_DIR);
|
|
ensureDirectory(CONVERT_DIR);
|
|
|
|
let valkeyClient = null;
|
|
let valkeyEnabled = false;
|
|
|
|
const createS3Client = (credentials) => {
|
|
const clientConfig = { ...defaultS3ClientConfig };
|
|
if (credentials && credentials.username && credentials.password) {
|
|
clientConfig.credentials = {
|
|
accessKeyId: credentials.username,
|
|
secretAccessKey: credentials.password
|
|
};
|
|
} else if (process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY) {
|
|
clientConfig.credentials = {
|
|
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
|
|
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
|
|
};
|
|
}
|
|
return new S3Client(clientConfig);
|
|
};
|
|
|
|
const progressMap = {};
|
|
const transcodeProcesses = new Map();
|
|
const wsSubscriptions = new Map();
|
|
|
|
const AVAILABLE_VIDEO_ENCODERS = [
|
|
{ value: 'h264_rkmpp', label: 'h264_rkmpp (RKMPP H.264)' },
|
|
{ value: 'hevc_rkmpp', label: 'hevc_rkmpp (RKMPP HEVC)' },
|
|
{ value: 'mjpeg_rkmpp', label: 'mjpeg_rkmpp (RKMPP MJPEG)' },
|
|
{ value: 'libx264', label: 'libx264 (Software / NEON H.264)' },
|
|
{ value: 'libx265', label: 'libx265 (Software / NEON H.265)' }
|
|
];
|
|
|
|
const AVAILABLE_VIDEO_DECODERS = [
|
|
{ value: 'auto', label: 'Auto Select Decoder' }
|
|
];
|
|
|
|
const SUPPORTED_TEXT_SUBTITLE_CODECS = new Set(['subrip', 'srt', 'ass', 'ssa', 'mov_text', 'text', 'webvtt']);
|
|
|
|
const getProgressKey = (key) => key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')).join('/');
|
|
const sanitizePathSegment = (value) => value.replace(/[^a-zA-Z0-9._-]/g, '_');
|
|
const getSessionStorageKey = (sessionId) => `media-coding-web:s3-session:${sessionId}`;
|
|
const parseCookieHeader = (cookieHeader) => {
|
|
if (typeof cookieHeader !== 'string' || !cookieHeader.trim()) {
|
|
return {};
|
|
}
|
|
|
|
return cookieHeader.split(';').reduce((acc, item) => {
|
|
const separatorIndex = item.indexOf('=');
|
|
if (separatorIndex === -1) {
|
|
return acc;
|
|
}
|
|
const key = item.slice(0, separatorIndex).trim();
|
|
const value = item.slice(separatorIndex + 1).trim();
|
|
if (key) {
|
|
acc[key] = decodeURIComponent(value);
|
|
}
|
|
return acc;
|
|
}, {});
|
|
};
|
|
|
|
const appendSetCookie = (res, cookieValue) => {
|
|
const currentValue = res.getHeader('Set-Cookie');
|
|
if (!currentValue) {
|
|
res.setHeader('Set-Cookie', cookieValue);
|
|
return;
|
|
}
|
|
const nextValue = Array.isArray(currentValue) ? currentValue.concat(cookieValue) : [currentValue, cookieValue];
|
|
res.setHeader('Set-Cookie', nextValue);
|
|
};
|
|
|
|
const setSessionCookie = (res, sessionId) => {
|
|
const parts = [
|
|
`${SESSION_COOKIE_NAME}=${encodeURIComponent(sessionId)}`,
|
|
'Path=/',
|
|
'HttpOnly',
|
|
'SameSite=Lax',
|
|
`Max-Age=${SESSION_TTL_SECONDS}`
|
|
];
|
|
if (isSecureCookie) {
|
|
parts.push('Secure');
|
|
}
|
|
appendSetCookie(res, parts.join('; '));
|
|
};
|
|
|
|
const clearSessionCookie = (res) => {
|
|
appendSetCookie(res, `${SESSION_COOKIE_NAME}=; Path=/; HttpOnly; SameSite=Lax; Max-Age=0${isSecureCookie ? '; Secure' : ''}`);
|
|
};
|
|
|
|
const initializeValkey = async () => {
|
|
if (!VALKEY_URL || valkeyClient) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const redisModule = require('redis');
|
|
valkeyClient = redisModule.createClient({ url: VALKEY_URL });
|
|
valkeyClient.on('error', (error) => {
|
|
console.error('[valkey] client error:', error);
|
|
valkeyEnabled = false;
|
|
});
|
|
await valkeyClient.connect();
|
|
valkeyEnabled = true;
|
|
console.log(`[valkey] connected url=${VALKEY_URL}`);
|
|
} catch (error) {
|
|
valkeyClient = null;
|
|
valkeyEnabled = false;
|
|
console.warn('[valkey] unavailable, falling back to request-scoped auth only:', error.message);
|
|
}
|
|
};
|
|
|
|
const createValkeySession = async (res, credentials) => {
|
|
if (!valkeyEnabled || !valkeyClient || !credentials?.username || !credentials?.password) {
|
|
return null;
|
|
}
|
|
|
|
const sessionId = crypto.randomUUID();
|
|
await valkeyClient.set(getSessionStorageKey(sessionId), JSON.stringify({
|
|
username: credentials.username,
|
|
password: credentials.password
|
|
}), {
|
|
EX: SESSION_TTL_SECONDS
|
|
});
|
|
setSessionCookie(res, sessionId);
|
|
console.log(`[auth] session stored sessionId=${sessionId} ttl=${SESSION_TTL_SECONDS}s`);
|
|
return sessionId;
|
|
};
|
|
|
|
const loadValkeySession = async (req) => {
|
|
if (!valkeyEnabled || !valkeyClient) {
|
|
return null;
|
|
}
|
|
|
|
const cookies = parseCookieHeader(req.headers.cookie || '');
|
|
const sessionId = cookies[SESSION_COOKIE_NAME];
|
|
if (!sessionId) {
|
|
return null;
|
|
}
|
|
|
|
const rawValue = await valkeyClient.get(getSessionStorageKey(sessionId));
|
|
if (!rawValue) {
|
|
return null;
|
|
}
|
|
|
|
try {
|
|
const parsed = JSON.parse(rawValue);
|
|
if (!parsed || typeof parsed.username !== 'string' || typeof parsed.password !== 'string') {
|
|
return null;
|
|
}
|
|
await valkeyClient.expire(getSessionStorageKey(sessionId), SESSION_TTL_SECONDS);
|
|
return {
|
|
username: parsed.username,
|
|
password: parsed.password,
|
|
source: 'session',
|
|
sessionId
|
|
};
|
|
} catch (error) {
|
|
console.error(`[auth] invalid session payload sessionId=${sessionId}:`, error);
|
|
return null;
|
|
}
|
|
};
|
|
|
|
const createStreamSessionId = () => `${Date.now()}-${Math.random().toString(36).slice(2, 10)}`;
|
|
const getCachePathParts = (bucket, key) => {
|
|
const safeKeySegments = key.split('/').map(segment => sanitizePathSegment(segment));
|
|
const progressKey = safeKeySegments.join('/');
|
|
const safeBucket = sanitizePathSegment(bucket);
|
|
const downloadFileName = `${safeBucket}--${safeKeySegments.join('--')}`;
|
|
const downloadPath = path.join(DOWNLOADS_DIR, downloadFileName);
|
|
const convertBaseName = `${safeBucket}--${safeKeySegments.join('--')}`;
|
|
const convertPath = path.join(CONVERT_DIR, `${convertBaseName}.tmp`);
|
|
return {
|
|
progressKey,
|
|
safeKeySegments,
|
|
safeBucket,
|
|
downloadPath,
|
|
convertPath
|
|
};
|
|
};
|
|
|
|
const addWsClient = (progressKey, ws) => {
|
|
if (!wsSubscriptions.has(progressKey)) {
|
|
wsSubscriptions.set(progressKey, new Set());
|
|
}
|
|
wsSubscriptions.get(progressKey).add(ws);
|
|
ws.currentKey = progressKey;
|
|
};
|
|
|
|
const removeWsClient = (ws) => {
|
|
const key = ws.currentKey;
|
|
if (!key) return;
|
|
const set = wsSubscriptions.get(key);
|
|
if (!set) return;
|
|
set.delete(ws);
|
|
if (set.size === 0) wsSubscriptions.delete(key);
|
|
};
|
|
|
|
const broadcastWs = (key, payload) => {
|
|
const clients = wsSubscriptions.get(key);
|
|
if (!clients) return;
|
|
const message = JSON.stringify(payload);
|
|
for (const client of clients) {
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
client.send(message);
|
|
}
|
|
}
|
|
};
|
|
|
|
const createFfmpegOptions = (encoderName) => {
|
|
const options = [];
|
|
if (encoderName === 'libx264' || encoderName === 'libx265') {
|
|
options.push('-preset', 'fast', '-crf', '23', '-threads', '0');
|
|
} else if (/_nvenc$/.test(encoderName)) {
|
|
options.push('-preset', 'fast', '-rc:v', 'vbr_hq', '-cq', '19');
|
|
} else if (/_qsv$/.test(encoderName)) {
|
|
options.push('-preset', 'fast', '-global_quality', '23');
|
|
} else if (/_vaapi$/.test(encoderName)) {
|
|
options.push('-qp', '23');
|
|
} else if (/_rkmpp$/.test(encoderName)) {
|
|
options.push('-qp_init', '23', '-pix_fmt', 'nv12');
|
|
}
|
|
return options;
|
|
};
|
|
|
|
const isRkmppCodec = (codecName) => /_rkmpp$/.test(codecName);
|
|
const isVaapiCodec = (codecName) => /_vaapi$/.test(codecName);
|
|
const availableEncoderValues = new Set(AVAILABLE_VIDEO_ENCODERS.map((item) => item.value));
|
|
const availableDecoderValues = new Set(AVAILABLE_VIDEO_DECODERS.map((item) => item.value));
|
|
|
|
const getRkmppDecoderName = (metadata) => {
|
|
const videoStream = (metadata?.streams || []).find((stream) => stream.codec_type === 'video');
|
|
const codecName = (videoStream?.codec_name || '').toLowerCase();
|
|
const decoderMap = {
|
|
av1: 'av1_rkmpp',
|
|
h263: 'h263_rkmpp',
|
|
h264: 'h264_rkmpp',
|
|
hevc: 'hevc_rkmpp',
|
|
mjpeg: 'mjpeg_rkmpp',
|
|
mpeg1video: 'mpeg1_rkmpp',
|
|
mpeg2video: 'mpeg2_rkmpp',
|
|
mpeg4: 'mpeg4_rkmpp',
|
|
vp8: 'vp8_rkmpp',
|
|
vp9: 'vp9_rkmpp'
|
|
};
|
|
return decoderMap[codecName] || null;
|
|
};
|
|
|
|
const parseFpsValue = (fpsText) => {
|
|
if (typeof fpsText !== 'string' || !fpsText.trim()) {
|
|
return 0;
|
|
}
|
|
|
|
if (fpsText.includes('/')) {
|
|
const [numeratorText, denominatorText] = fpsText.split('/');
|
|
const numerator = parseFloat(numeratorText);
|
|
const denominator = parseFloat(denominatorText);
|
|
if (Number.isFinite(numerator) && Number.isFinite(denominator) && denominator !== 0) {
|
|
return numerator / denominator;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
const numericValue = parseFloat(fpsText);
|
|
return Number.isFinite(numericValue) ? numericValue : 0;
|
|
};
|
|
|
|
const getSeekFriendlyOutputOptions = (encoderName, metadata) => {
|
|
const videoStream = (metadata?.streams || []).find((stream) => stream.codec_type === 'video') || {};
|
|
const parsedFps = parseFpsValue(videoStream.avg_frame_rate) || parseFpsValue(videoStream.r_frame_rate) || 24;
|
|
const normalizedFps = Math.min(Math.max(Math.round(parsedFps), 12), 60);
|
|
const gopSize = Math.max(24, normalizedFps * 2);
|
|
const options = [
|
|
'-movflags', 'frag_keyframe+empty_moov+default_base_moof+faststart',
|
|
'-frag_duration', '1000000',
|
|
'-min_frag_duration', '1000000',
|
|
'-g', gopSize.toString(),
|
|
'-keyint_min', gopSize.toString()
|
|
];
|
|
|
|
if (encoderName === 'libx264' || encoderName === 'libx265') {
|
|
options.push('-sc_threshold', '0', '-force_key_frames', 'expr:gte(t,n_forced*2)');
|
|
} else if (/_nvenc$/.test(encoderName)) {
|
|
options.push('-forced-idr', '1', '-force_key_frames', 'expr:gte(t,n_forced*2)');
|
|
} else if (/_qsv$/.test(encoderName)) {
|
|
options.push('-idr_interval', '1');
|
|
} else if (/_rkmpp$/.test(encoderName)) {
|
|
options.push('-force_key_frames', 'expr:gte(t,n_forced*2)');
|
|
}
|
|
|
|
return options;
|
|
};
|
|
|
|
const shouldRetryWithSoftware = (message) => {
|
|
if (!message) return false;
|
|
return /Cannot load libcuda\.so\.1|Could not open encoder before EOF|Error while opening encoder|Operation not permitted|Invalid argument|mpp_create|rkmpp/i.test(message);
|
|
};
|
|
|
|
const ensureSourceCached = async ({ s3Client, bucket, key, targetPath, onProgress, logger = console }) => {
|
|
if (fs.existsSync(targetPath)) {
|
|
const stats = fs.statSync(targetPath);
|
|
logger.log(`[download] cache hit bucket=${bucket} key=${key} path=${targetPath} bytes=${stats.size}`);
|
|
return {
|
|
totalBytes: stats.size,
|
|
downloadedBytes: stats.size,
|
|
cacheExists: true
|
|
};
|
|
}
|
|
|
|
logger.log(`[download] request start bucket=${bucket} key=${key} target=${targetPath}`);
|
|
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
|
|
try {
|
|
const response = await s3Client.send(command);
|
|
const s3Stream = response.Body;
|
|
const totalBytes = response.ContentLength || 0;
|
|
let downloadedBytes = 0;
|
|
|
|
logger.log(`[download] response received bucket=${bucket} key=${key} totalBytes=${totalBytes || 0}`);
|
|
|
|
if (typeof onProgress === 'function') {
|
|
onProgress({ totalBytes, downloadedBytes, cacheExists: false });
|
|
}
|
|
|
|
await new Promise((resolve, reject) => {
|
|
const writeStream = fs.createWriteStream(targetPath);
|
|
s3Stream.on('data', (chunk) => {
|
|
downloadedBytes += chunk.length;
|
|
if (typeof onProgress === 'function') {
|
|
onProgress({ totalBytes, downloadedBytes, cacheExists: false });
|
|
}
|
|
});
|
|
s3Stream.on('error', reject);
|
|
writeStream.on('error', reject);
|
|
writeStream.on('finish', resolve);
|
|
s3Stream.pipe(writeStream);
|
|
});
|
|
|
|
logger.log(`[download] request complete bucket=${bucket} key=${key} path=${targetPath} downloadedBytes=${downloadedBytes} totalBytes=${totalBytes || 0}`);
|
|
|
|
return {
|
|
totalBytes,
|
|
downloadedBytes,
|
|
cacheExists: false
|
|
};
|
|
} catch (error) {
|
|
logger.error(`[download] request failed bucket=${bucket} key=${key} target=${targetPath}:`, error);
|
|
throw error;
|
|
}
|
|
};
|
|
|
|
const getSubtitleTracks = (metadata) => {
|
|
return (metadata?.streams || [])
|
|
.filter((stream) => stream.codec_type === 'subtitle')
|
|
.map((stream, orderIndex) => {
|
|
const codec = (stream.codec_name || '').toLowerCase();
|
|
const language = stream.tags?.language || '';
|
|
const title = stream.tags?.title || '';
|
|
const supported = SUPPORTED_TEXT_SUBTITLE_CODECS.has(codec);
|
|
const labelParts = [
|
|
language ? language.toUpperCase() : `Subtitle ${orderIndex + 1}`,
|
|
title || null,
|
|
codec ? codec : null,
|
|
supported ? null : 'Unsupported'
|
|
].filter(Boolean);
|
|
|
|
return {
|
|
index: stream.index,
|
|
codec,
|
|
language,
|
|
title,
|
|
supported,
|
|
label: labelParts.join(' / ')
|
|
};
|
|
});
|
|
};
|
|
|
|
const probeFile = (filePath) => {
|
|
return new Promise((resolve, reject) => {
|
|
ffmpeg.ffprobe(filePath, (err, metadata) => {
|
|
if (err) reject(err);
|
|
else resolve(metadata);
|
|
});
|
|
});
|
|
};
|
|
|
|
const parseTimemarkToSeconds = (timemark) => {
|
|
if (typeof timemark !== 'string' || !timemark.trim()) {
|
|
return 0;
|
|
}
|
|
|
|
const parts = timemark.trim().split(':');
|
|
if (parts.length !== 3) {
|
|
const numericValue = parseFloat(timemark);
|
|
return Number.isFinite(numericValue) ? numericValue : 0;
|
|
}
|
|
|
|
const [hoursPart, minutesPart, secondsPart] = parts;
|
|
const hours = parseInt(hoursPart, 10);
|
|
const minutes = parseInt(minutesPart, 10);
|
|
const seconds = parseFloat(secondsPart);
|
|
|
|
if (![hours, minutes, seconds].every(Number.isFinite)) {
|
|
return 0;
|
|
}
|
|
|
|
return (hours * 3600) + (minutes * 60) + seconds;
|
|
};
|
|
|
|
const stopActiveTranscode = (progressKey) => {
|
|
const activeProcess = transcodeProcesses.get(progressKey);
|
|
if (!activeProcess?.command) {
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
if (typeof activeProcess.command.removeAllListeners === 'function') {
|
|
activeProcess.command.removeAllListeners('progress');
|
|
activeProcess.command.removeAllListeners('stderr');
|
|
activeProcess.command.removeAllListeners('end');
|
|
activeProcess.command.removeAllListeners('error');
|
|
}
|
|
if (typeof activeProcess.command.kill === 'function') {
|
|
activeProcess.command.kill('SIGKILL');
|
|
}
|
|
} catch (killError) {
|
|
console.warn(`Failed to kill transcode process for ${progressKey}:`, killError);
|
|
} finally {
|
|
transcodeProcesses.delete(progressKey);
|
|
}
|
|
|
|
return true;
|
|
};
|
|
|
|
const extractS3Credentials = async (req) => {
|
|
const query = req.query || {};
|
|
const username = req.headers['x-s3-username'] || req.body?.username || query.username || query.accessKeyId || '';
|
|
const password = req.headers['x-s3-password'] || req.body?.password || query.password || query.secretAccessKey || '';
|
|
const normalizedCredentials = {
|
|
username: typeof username === 'string' ? username.trim() : '',
|
|
password: typeof password === 'string' ? password : '',
|
|
source: 'direct'
|
|
};
|
|
|
|
if (normalizedCredentials.username && normalizedCredentials.password) {
|
|
return normalizedCredentials;
|
|
}
|
|
|
|
const sessionCredentials = await loadValkeySession(req);
|
|
if (sessionCredentials?.username && sessionCredentials?.password) {
|
|
console.log(`[auth] using valkey session sessionId=${sessionCredentials.sessionId}`);
|
|
return sessionCredentials;
|
|
}
|
|
|
|
return normalizedCredentials;
|
|
};
|
|
|
|
const wss = new WebSocket.Server({ server });
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.on('message', (raw) => {
|
|
try {
|
|
const message = JSON.parse(raw.toString());
|
|
if (message.type === 'subscribe' && typeof message.key === 'string') {
|
|
const progressKey = getProgressKey(message.key);
|
|
if (ws.currentKey && ws.currentKey !== progressKey) {
|
|
removeWsClient(ws);
|
|
}
|
|
addWsClient(progressKey, ws);
|
|
|
|
const currentProgress = progressMap[progressKey];
|
|
if (currentProgress) {
|
|
if (typeof currentProgress.duration === 'number' && currentProgress.duration > 0) {
|
|
ws.send(JSON.stringify({ type: 'duration', key: message.key, duration: currentProgress.duration }));
|
|
}
|
|
ws.send(JSON.stringify({ type: 'progress', key: message.key, progress: currentProgress }));
|
|
if (currentProgress.status === 'finished' && currentProgress.mp4Url) {
|
|
ws.send(JSON.stringify({ type: 'ready', key: message.key, mp4Url: currentProgress.mp4Url }));
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error('WebSocket parse error:', error);
|
|
}
|
|
});
|
|
|
|
ws.on('close', () => removeWsClient(ws));
|
|
});
|
|
|
|
|
|
const clearDownloadCache = () => {
|
|
try {
|
|
ensureDirectory(DOWNLOADS_DIR);
|
|
const files = fs.readdirSync(DOWNLOADS_DIR);
|
|
for (const file of files) {
|
|
const filePath = path.join(DOWNLOADS_DIR, file);
|
|
if (fs.statSync(filePath).isFile()) {
|
|
fs.rmSync(filePath, { force: true });
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.error('Failed to clear download cache:', err);
|
|
throw err;
|
|
}
|
|
};
|
|
|
|
const clearConvertCache = () => {
|
|
try {
|
|
ensureDirectory(CONVERT_DIR);
|
|
const files = fs.readdirSync(CONVERT_DIR);
|
|
for (const file of files) {
|
|
const filePath = path.join(CONVERT_DIR, file);
|
|
if (fs.statSync(filePath).isFile()) {
|
|
fs.rmSync(filePath, { force: true });
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.error('Failed to clear convert cache:', err);
|
|
throw err;
|
|
}
|
|
};
|
|
|
|
|
|
// Endpoint to list available buckets
|
|
app.get('/api/buckets', async (req, res) => {
|
|
let auth = null;
|
|
try {
|
|
auth = await extractS3Credentials(req);
|
|
console.log(`[s3] list buckets start endpoint=${defaultS3ClientConfig.endpoint || 'aws-default'} region=${defaultS3ClientConfig.region} authProvided=${Boolean(auth.username)}`);
|
|
const s3Client = createS3Client(auth);
|
|
const command = new ListBucketsCommand({});
|
|
const response = await s3Client.send(command);
|
|
const buckets = response.Buckets || [];
|
|
if (auth.source === 'direct' && auth.username && auth.password) {
|
|
await createValkeySession(res, auth);
|
|
}
|
|
console.log(`[s3] list buckets complete count=${buckets.length}`);
|
|
res.json({ buckets });
|
|
} catch (error) {
|
|
if (auth?.source === 'session') {
|
|
clearSessionCookie(res);
|
|
}
|
|
console.error('[s3] list buckets failed:', error);
|
|
res.status(500).json({ error: 'Failed to list buckets', detail: error.message });
|
|
}
|
|
});
|
|
|
|
// Endpoint to list videos in the bucket
|
|
app.get('/api/videos', async (req, res) => {
|
|
try {
|
|
const bucket = req.query.bucket || BUCKET_NAME;
|
|
if (!bucket) {
|
|
return res.status(400).json({ error: 'Bucket name is required' });
|
|
}
|
|
const allObjects = [];
|
|
const auth = await extractS3Credentials(req);
|
|
const s3Client = createS3Client(auth);
|
|
let continuationToken;
|
|
let pageNumber = 0;
|
|
|
|
console.log(`[s3] scan bucket start bucket=${bucket} endpoint=${defaultS3ClientConfig.endpoint || 'aws-default'} authProvided=${Boolean(auth.username)}`);
|
|
|
|
do {
|
|
pageNumber += 1;
|
|
console.log(`[s3] scan bucket page request bucket=${bucket} page=${pageNumber} continuationToken=${continuationToken || 'none'}`);
|
|
const command = new ListObjectsV2Command({
|
|
Bucket: bucket,
|
|
ContinuationToken: continuationToken,
|
|
});
|
|
const response = await s3Client.send(command);
|
|
const pageItems = response.Contents || [];
|
|
allObjects.push(...pageItems);
|
|
console.log(`[s3] scan bucket page result bucket=${bucket} page=${pageNumber} objects=${pageItems.length} truncated=${Boolean(response.IsTruncated)} nextToken=${response.NextContinuationToken || 'none'} scannedTotal=${allObjects.length}`);
|
|
continuationToken = response.IsTruncated ? response.NextContinuationToken : undefined;
|
|
} while (continuationToken);
|
|
|
|
// Filter for a broader set of common video formats
|
|
const videoExtensions = [
|
|
'.3gp', '.3g2', '.asf', '.avi', '.divx', '.flv', '.m2ts', '.m2v',
|
|
'.m4v', '.mkv', '.mov', '.mp4', '.mpeg', '.mpg', '.mts', '.mxf',
|
|
'.ogm', '.ogv', '.qt', '.rm', '.rmvb', '.ts', '.vob', '.vro',
|
|
'.webm', '.wmv'
|
|
];
|
|
const videos = allObjects
|
|
.map(item => item.Key)
|
|
.filter(key => {
|
|
if (!key) return false;
|
|
const lowerKey = key.toLowerCase();
|
|
return videoExtensions.some(ext => lowerKey.endsWith(ext));
|
|
});
|
|
|
|
console.log(`[s3] scan bucket complete bucket=${bucket} pages=${pageNumber} objects=${allObjects.length} videos=${videos.length}`);
|
|
res.json({ videos });
|
|
} catch (error) {
|
|
console.error(`[s3] scan bucket failed bucket=${req.query.bucket || BUCKET_NAME || 'unknown'}:`, error);
|
|
res.status(500).json({ error: 'Failed to fetch videos from S3', detail: error.message });
|
|
}
|
|
});
|
|
|
|
app.get('/api/config', (req, res) => {
|
|
const title = process.env.APP_TITLE || 'S3 Media Transcoder';
|
|
res.json({
|
|
title,
|
|
ffmpegPath: JELLYFIN_FFMPEG_PATH,
|
|
ffprobePath: JELLYFIN_FFPROBE_PATH,
|
|
defaultVideoEncoder: 'h264_rkmpp',
|
|
defaultVideoDecoder: 'auto',
|
|
videoEncoders: AVAILABLE_VIDEO_ENCODERS,
|
|
videoDecoders: AVAILABLE_VIDEO_DECODERS
|
|
});
|
|
});
|
|
|
|
app.get('/api/subtitles', async (req, res) => {
|
|
const bucket = req.query.bucket;
|
|
const key = req.query.key;
|
|
const requestedStreamIndex = typeof req.query.streamIndex === 'string' ? req.query.streamIndex.trim() : '';
|
|
const startSeconds = Math.max(0, parseFloat(req.query.ss) || 0);
|
|
|
|
if (!bucket) {
|
|
return res.status(400).json({ error: 'Bucket name is required' });
|
|
}
|
|
if (!key) {
|
|
return res.status(400).json({ error: 'Video key is required' });
|
|
}
|
|
|
|
const { downloadPath } = getCachePathParts(bucket, key);
|
|
const auth = await extractS3Credentials(req);
|
|
const s3Client = createS3Client(auth);
|
|
|
|
try {
|
|
await ensureSourceCached({ s3Client, bucket, key, targetPath: downloadPath, logger: console });
|
|
|
|
const metadata = await probeFile(downloadPath);
|
|
const tracks = getSubtitleTracks(metadata);
|
|
|
|
if (!requestedStreamIndex) {
|
|
return res.json({ tracks });
|
|
}
|
|
|
|
const numericStreamIndex = parseInt(requestedStreamIndex, 10);
|
|
const matchedTrack = tracks.find((track) => track.index === numericStreamIndex);
|
|
if (!matchedTrack) {
|
|
return res.status(404).json({ error: 'Subtitle track not found' });
|
|
}
|
|
if (!matchedTrack.supported) {
|
|
return res.status(415).json({ error: 'Subtitle codec is not supported for WebVTT output', codec: matchedTrack.codec });
|
|
}
|
|
|
|
res.setHeader('Content-Type', 'text/vtt; charset=utf-8');
|
|
res.setHeader('Cache-Control', 'no-cache');
|
|
|
|
const subtitleCommand = ffmpeg()
|
|
.input(downloadPath)
|
|
.outputOptions(['-map', `0:${numericStreamIndex}`, '-reset_timestamps', '1'])
|
|
.format('webvtt')
|
|
.noAudio()
|
|
.noVideo();
|
|
|
|
if (startSeconds > 0) {
|
|
if (typeof subtitleCommand.seekInput === 'function') {
|
|
subtitleCommand.seekInput(startSeconds);
|
|
} else {
|
|
subtitleCommand.inputOptions(['-ss', startSeconds.toString()]);
|
|
}
|
|
}
|
|
|
|
subtitleCommand
|
|
.on('error', (error) => {
|
|
if (!res.headersSent) {
|
|
res.status(500).json({ error: 'Failed to extract subtitles', detail: error.message });
|
|
} else {
|
|
res.destroy(error);
|
|
}
|
|
})
|
|
.pipe(res, { end: true });
|
|
} catch (error) {
|
|
console.error('Error serving subtitles:', error);
|
|
res.status(500).json({ error: 'Failed to load subtitles', detail: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/clear-download-cache', (req, res) => {
|
|
try {
|
|
clearDownloadCache();
|
|
clearConvertCache();
|
|
res.json({ message: 'Download and convert cache cleared' });
|
|
} catch (error) {
|
|
console.error('Error clearing download cache:', error);
|
|
res.status(500).json({ error: 'Failed to clear download cache', detail: error.message });
|
|
}
|
|
});
|
|
|
|
app.post('/api/stop-transcode', (req, res) => {
|
|
try {
|
|
const { key } = req.body;
|
|
if (!key) {
|
|
return res.status(400).json({ error: 'Video key is required' });
|
|
}
|
|
|
|
const progressKey = getProgressKey(key);
|
|
if (!transcodeProcesses.has(progressKey)) {
|
|
return res.status(404).json({ error: 'No active transcode found for this key' });
|
|
}
|
|
stopActiveTranscode(progressKey);
|
|
progressMap[progressKey] = {
|
|
status: 'cancelled',
|
|
percent: 0,
|
|
details: 'Transcode stopped by user',
|
|
mp4Url: null
|
|
};
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
|
res.json({ message: 'Transcode stopped' });
|
|
} catch (error) {
|
|
console.error('Error stopping transcode:', error);
|
|
res.status(500).json({ error: 'Failed to stop transcode', detail: error.message });
|
|
}
|
|
});
|
|
|
|
app.get('/api/stream', async (req, res) => {
|
|
const bucket = req.query.bucket;
|
|
const key = req.query.key;
|
|
const requestedDecoder = typeof req.query.decoder === 'string' ? req.query.decoder.trim() : 'auto';
|
|
const requestedEncoder = typeof req.query.encoder === 'string' ? req.query.encoder.trim() : 'h264_rkmpp';
|
|
const startSeconds = parseFloat(req.query.ss) || 0;
|
|
const streamSessionId = typeof req.query.streamSessionId === 'string' && req.query.streamSessionId.trim()
|
|
? req.query.streamSessionId.trim()
|
|
: createStreamSessionId();
|
|
|
|
if (!bucket) {
|
|
return res.status(400).json({ error: 'Bucket name is required' });
|
|
}
|
|
if (!key) {
|
|
return res.status(400).json({ error: 'Video key is required' });
|
|
}
|
|
|
|
const videoEncoder = availableEncoderValues.has(requestedEncoder) ? requestedEncoder : 'h264_rkmpp';
|
|
const requestedVideoDecoder = availableDecoderValues.has(requestedDecoder) ? requestedDecoder : 'auto';
|
|
|
|
const { progressKey, downloadPath, convertPath } = getCachePathParts(bucket, key);
|
|
|
|
const auth = await extractS3Credentials(req);
|
|
const s3Client = createS3Client(auth);
|
|
|
|
try {
|
|
const replacedExistingStream = stopActiveTranscode(progressKey);
|
|
if (replacedExistingStream && startSeconds > 0) {
|
|
progressMap[progressKey] = {
|
|
...(progressMap[progressKey] || {}),
|
|
status: 'transcoding',
|
|
percent: Math.min(Math.max(Math.round((startSeconds / Math.max(progressMap[progressKey]?.duration || startSeconds || 1, 1)) * 100), 0), 100),
|
|
startSeconds,
|
|
streamSessionId,
|
|
details: `Restarting transcode from ${startSeconds.toFixed(2)}s`,
|
|
mp4Url: null
|
|
};
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
|
}
|
|
|
|
let totalBytes = 0;
|
|
let downloadedBytes = 0;
|
|
const cacheExists = fs.existsSync(downloadPath);
|
|
|
|
if (!cacheExists) {
|
|
progressMap[progressKey] = {
|
|
status: 'downloading',
|
|
percent: 0,
|
|
downloadedBytes: 0,
|
|
totalBytes,
|
|
streamSessionId,
|
|
details: 'Downloading full source before streaming...',
|
|
mp4Url: null
|
|
};
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
|
|
|
await ensureSourceCached({
|
|
s3Client,
|
|
bucket,
|
|
key,
|
|
targetPath: downloadPath,
|
|
logger: console,
|
|
onProgress: ({ totalBytes: nextTotalBytes, downloadedBytes: nextDownloadedBytes }) => {
|
|
totalBytes = nextTotalBytes;
|
|
downloadedBytes = nextDownloadedBytes;
|
|
const percent = totalBytes ? Math.min(100, Math.round(downloadedBytes / totalBytes * 100)) : 0;
|
|
const downloadState = {
|
|
status: 'downloading',
|
|
percent,
|
|
downloadedBytes,
|
|
totalBytes,
|
|
streamSessionId,
|
|
details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...',
|
|
mp4Url: null
|
|
};
|
|
progressMap[progressKey] = downloadState;
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: downloadState });
|
|
}
|
|
});
|
|
|
|
progressMap[progressKey] = {
|
|
status: 'downloaded',
|
|
percent: 100,
|
|
downloadedBytes,
|
|
totalBytes,
|
|
streamSessionId,
|
|
details: 'Source download complete, starting real-time transcode...',
|
|
mp4Url: null
|
|
};
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
|
} else {
|
|
const stats = fs.statSync(downloadPath);
|
|
totalBytes = stats.size;
|
|
downloadedBytes = totalBytes;
|
|
console.log(`[download] cache ready bucket=${bucket} key=${key} path=${downloadPath} bytes=${totalBytes}`);
|
|
progressMap[progressKey] = {
|
|
status: 'downloaded',
|
|
percent: 100,
|
|
downloadedBytes,
|
|
totalBytes,
|
|
streamSessionId,
|
|
details: 'Source already downloaded locally, starting real-time transcode...',
|
|
mp4Url: null
|
|
};
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
|
}
|
|
|
|
// Probe file for duration and broadcast to clients
|
|
let sourceMetadata = null;
|
|
try {
|
|
const metadata = await probeFile(downloadPath);
|
|
sourceMetadata = metadata;
|
|
const duration = metadata.format?.duration || 0;
|
|
progressMap[progressKey] = {
|
|
...(progressMap[progressKey] || {}),
|
|
duration: parseFloat(duration) || 0,
|
|
streamSessionId
|
|
};
|
|
broadcastWs(progressKey, { type: 'duration', key, duration: parseFloat(duration) });
|
|
} catch (probeErr) {
|
|
console.error('Probe failed:', probeErr);
|
|
}
|
|
|
|
res.setHeader('Content-Type', 'video/mp4');
|
|
res.setHeader('Cache-Control', 'no-cache');
|
|
res.setHeader('Connection', 'keep-alive');
|
|
|
|
let ffmpegCommand = null;
|
|
let outputStream = null;
|
|
let cacheWriteStream = null;
|
|
|
|
const startStream = (encoderName, decoderName) => {
|
|
ensureDirectory(CONVERT_DIR);
|
|
const streamingOptions = createFfmpegOptions(encoderName).concat(getSeekFriendlyOutputOptions(encoderName, sourceMetadata));
|
|
console.log(`[transcode] start bucket=${bucket} key=${key} encoder=${encoderName} decoder=${decoderName} source=${downloadPath} cache=${convertPath} startSeconds=${startSeconds}`);
|
|
ffmpegCommand = ffmpeg()
|
|
.input(downloadPath)
|
|
.videoCodec(encoderName)
|
|
.audioCodec('aac')
|
|
.outputOptions(streamingOptions)
|
|
.format('mp4');
|
|
|
|
if (startSeconds > 0) {
|
|
if (typeof ffmpegCommand.seekInput === 'function') {
|
|
ffmpegCommand.seekInput(startSeconds);
|
|
} else {
|
|
ffmpegCommand.inputOptions(['-ss', startSeconds.toString()]);
|
|
}
|
|
}
|
|
|
|
if (isVaapiCodec(encoderName)) {
|
|
ffmpegCommand
|
|
.inputOptions(['-vaapi_device', '/dev/dri/renderD128'])
|
|
.videoFilters('format=nv12,hwupload');
|
|
}
|
|
|
|
const resolvedDecoderName = decoderName === 'auto' && isRkmppCodec(encoderName)
|
|
? getRkmppDecoderName(sourceMetadata)
|
|
: decoderName;
|
|
|
|
if (resolvedDecoderName && resolvedDecoderName !== 'auto') {
|
|
ffmpegCommand.inputOptions(['-c:v', resolvedDecoderName]);
|
|
}
|
|
|
|
transcodeProcesses.set(progressKey, { command: ffmpegCommand, streamSessionId });
|
|
outputStream = new PassThrough();
|
|
cacheWriteStream = fs.createWriteStream(convertPath);
|
|
|
|
outputStream.pipe(res);
|
|
outputStream.pipe(cacheWriteStream);
|
|
|
|
cacheWriteStream.on('finish', () => {
|
|
try {
|
|
const stats = fs.existsSync(convertPath) ? fs.statSync(convertPath) : null;
|
|
console.log(`[transcode] cache written bucket=${bucket} key=${key} path=${convertPath} bytes=${stats?.size || 0}`);
|
|
} catch (error) {
|
|
console.warn(`[transcode] cache stat failed path=${convertPath}:`, error);
|
|
}
|
|
});
|
|
|
|
cacheWriteStream.on('error', (error) => {
|
|
console.error(`[transcode] cache write failed bucket=${bucket} key=${key} path=${convertPath}:`, error);
|
|
});
|
|
|
|
ffmpegCommand
|
|
.on('progress', (progress) => {
|
|
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
|
|
return;
|
|
}
|
|
const effectiveDuration = Math.max(0, (progressMap[progressKey]?.duration || 0) - startSeconds);
|
|
const timemarkSeconds = parseTimemarkToSeconds(progress.timemark || '0');
|
|
const absoluteSeconds = startSeconds + (isFinite(timemarkSeconds) ? timemarkSeconds : 0);
|
|
const percent = effectiveDuration > 0
|
|
? Math.min(Math.max(Math.round((absoluteSeconds / (progressMap[progressKey]?.duration || effectiveDuration)) * 100), 0), 100)
|
|
: Math.min(Math.max(Math.round(progress.percent || 0), 0), 100);
|
|
const progressState = {
|
|
status: 'transcoding',
|
|
percent,
|
|
frame: progress.frames || null,
|
|
fps: progress.currentFps || null,
|
|
bitrate: progress.currentKbps || null,
|
|
timemark: progress.timemark || null,
|
|
absoluteSeconds,
|
|
duration: progressMap[progressKey]?.duration || null,
|
|
startSeconds,
|
|
streamSessionId,
|
|
details: `Streaming transcode ${percent}%`,
|
|
mp4Url: null
|
|
};
|
|
progressMap[progressKey] = progressState;
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressState });
|
|
})
|
|
.on('stderr', (stderrLine) => {
|
|
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
|
|
return;
|
|
}
|
|
console.log(`ffmpeg stderr: ${stderrLine}`);
|
|
})
|
|
.on('end', () => {
|
|
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
|
|
return;
|
|
}
|
|
console.log(`[transcode] complete bucket=${bucket} key=${key} encoder=${encoderName} cache=${convertPath}`);
|
|
transcodeProcesses.delete(progressKey);
|
|
progressMap[progressKey] = {
|
|
status: 'finished',
|
|
percent: 100,
|
|
duration: progressMap[progressKey]?.duration || null,
|
|
startSeconds,
|
|
streamSessionId,
|
|
details: 'Streaming transcode complete',
|
|
mp4Url: null
|
|
};
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
|
})
|
|
.on('error', (err) => {
|
|
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
|
|
return;
|
|
}
|
|
console.error(`[transcode] failed bucket=${bucket} key=${key} encoder=${encoderName} cache=${convertPath}:`, err);
|
|
transcodeProcesses.delete(progressKey);
|
|
const failedState = {
|
|
status: 'failed',
|
|
percent: progressMap[progressKey]?.percent || 0,
|
|
duration: progressMap[progressKey]?.duration || null,
|
|
startSeconds,
|
|
streamSessionId,
|
|
details: err.message || 'Streaming transcode failed',
|
|
mp4Url: null
|
|
};
|
|
progressMap[progressKey] = failedState;
|
|
broadcastWs(progressKey, { type: 'progress', key, progress: failedState });
|
|
if (!res.headersSent) {
|
|
res.status(500).json({ error: 'Failed to stream transcoded video', detail: err.message });
|
|
} else {
|
|
res.destroy(err);
|
|
}
|
|
});
|
|
|
|
ffmpegCommand.pipe(outputStream, { end: true });
|
|
};
|
|
|
|
res.on('close', () => {
|
|
if (ffmpegCommand && typeof ffmpegCommand.kill === 'function') {
|
|
try {
|
|
ffmpegCommand.kill('SIGKILL');
|
|
} catch (_) {}
|
|
}
|
|
if (transcodeProcesses.get(progressKey)?.streamSessionId === streamSessionId) {
|
|
transcodeProcesses.delete(progressKey);
|
|
}
|
|
if (outputStream && !outputStream.destroyed) {
|
|
outputStream.end();
|
|
}
|
|
if (cacheWriteStream && !cacheWriteStream.destroyed) {
|
|
cacheWriteStream.end();
|
|
}
|
|
});
|
|
|
|
startStream(videoEncoder, requestedVideoDecoder);
|
|
} catch (error) {
|
|
console.error('Error in stream:', error);
|
|
if (!res.headersSent) {
|
|
res.status(500).json({ error: 'Failed to stream video', detail: error.message });
|
|
} else {
|
|
res.destroy(error);
|
|
}
|
|
}
|
|
});
|
|
|
|
initializeValkey().finally(() => {
|
|
server.listen(PORT, HOST, () => {
|
|
console.log(`Server running on http://${HOST}:${PORT}`);
|
|
});
|
|
});
|