const express = require('express'); const cors = require('cors'); const dotenv = require('dotenv'); const fs = require('fs'); const os = require('os'); const path = require('path'); const http = require('http'); const WebSocket = require('ws'); const ffmpeg = require('fluent-ffmpeg'); const { S3Client, ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand } = require('@aws-sdk/client-s3'); const crypto = require('crypto'); const Redis = require('ioredis'); dotenv.config(); const winston = require('winston'); require('winston-daily-rotate-file'); const util = require('util'); const LOGS_DIR = path.join(__dirname, 'logs'); if (!fs.existsSync(LOGS_DIR)) { fs.mkdirSync(LOGS_DIR, { recursive: true }); } const logger = winston.createLogger({ level: 'info', format: winston.format.combine( winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), winston.format.errors({ stack: true }), winston.format.splat(), winston.format.printf(({ timestamp, level, message, stack, ...meta }) => { return `[${timestamp}] ${level.toUpperCase()}: ${stack || message} ${Object.keys(meta).length ? JSON.stringify(meta) : ''}`; }) ), transports: [ new winston.transports.Console({ format: winston.format.combine( winston.format.colorize(), winston.format.printf(({ timestamp, level, message, stack }) => { return `[${timestamp}] ${level}: ${stack || message}`; }) ) }), new winston.transports.DailyRotateFile({ filename: path.join(LOGS_DIR, 'application-%DATE%.log'), datePattern: 'YYYY-MM-DD', zippedArchive: true, maxSize: '20m', maxFiles: '14d' }), new winston.transports.DailyRotateFile({ filename: path.join(LOGS_DIR, 'error-%DATE%.log'), datePattern: 'YYYY-MM-DD', level: 'error', zippedArchive: true, maxSize: '20m', maxFiles: '30d' }) ] }); console.log = function(...args) { logger.info(util.format(...args)); }; console.info = function(...args) { logger.info(util.format(...args)); }; console.warn = function(...args) { logger.warn(util.format(...args)); }; console.error = function(...args) { logger.error(util.format(...args)); }; process.on('uncaughtException', (err) => { logger.error(`Uncaught Exception: ${err.message}`, err); }); process.on('unhandledRejection', (reason, promise) => { logger.error('Unhandled Rejection at:', promise, 'reason:', reason); }); const CACHE_DIR = path.join(__dirname, 'cache'); if (!fs.existsSync(CACHE_DIR)) { fs.mkdirSync(CACHE_DIR, { recursive: true }); } const app = express(); const PORT = process.env.PORT || 3000; const HOST = process.env.HOST || process.env.LISTEN_ADDRESS || '0.0.0.0'; const server = http.createServer(app); const JELLYFIN_FFMPEG_PATH = process.env.JELLYFIN_FFMPEG_PATH || '/usr/lib/jellyfin-ffmpeg/ffmpeg'; const JELLYFIN_FFPROBE_PATH = process.env.JELLYFIN_FFPROBE_PATH || '/usr/lib/jellyfin-ffmpeg/ffprobe'; if (typeof ffmpeg.setFfmpegPath === 'function') { ffmpeg.setFfmpegPath(JELLYFIN_FFMPEG_PATH); } if (typeof ffmpeg.setFfprobePath === 'function') { ffmpeg.setFfprobePath(JELLYFIN_FFPROBE_PATH); } const redisUrl = process.env.VALKEY_URL || process.env.REDIS_URL || 'redis://localhost:6379'; const redisDb = parseInt(process.env.VALKEY_DB || process.env.REDIS_DB || '0', 10); const redisPrefix = process.env.VALKEY_PREFIX || process.env.REDIS_PREFIX || ''; const redisClient = new Redis(redisUrl, { db: isNaN(redisDb) ? 0 : redisDb, keyPrefix: redisPrefix ? (redisPrefix.endsWith(':') ? redisPrefix : redisPrefix + ':') : '' }); app.use(cors()); app.use(express.json()); app.use(express.static('public')); const rawBucketAddress = process.env.S3_BUCKET_ADDRESS || process.env.S3_BUCKET_NAME || ''; let BUCKET_NAME = rawBucketAddress; const parsedBucketUrl = rawBucketAddress.includes('://') ? (() => { try { const parsed = new URL(rawBucketAddress); const name = parsed.pathname.replace(/^\/+/, ''); if (name) { BUCKET_NAME = name; } return parsed.origin; } catch (err) { return undefined; } })() : undefined; const defaultS3ClientConfig = { region: process.env.AWS_REGION || 'us-east-1', endpoint: process.env.S3_ENDPOINT || parsedBucketUrl, forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true' }; const createS3Client = (credentials) => { const clientConfig = { ...defaultS3ClientConfig }; if (credentials && credentials.username && credentials.password) { clientConfig.credentials = { accessKeyId: credentials.username, secretAccessKey: credentials.password }; } else if (process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY) { clientConfig.credentials = { accessKeyId: process.env.AWS_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY }; } return new S3Client(clientConfig); }; const progressMap = {}; const transcodeProcesses = new Map(); const wsSubscriptions = new Map(); const activeDownloads = new Map(); const AVAILABLE_VIDEO_ENCODERS = [ { value: 'h264_rkmpp', label: 'H.264(RKMPP HighSpeed)' }, { value: 'hevc_rkmpp', label: 'H.265(RKMPP HighSpeed)' }, { value: 'libx264', label: 'H.264(Software Slow)' }, { value: 'libx265', label: 'H.265(Software Slow)' } ]; const AVAILABLE_VIDEO_DECODERS = [ { value: 'auto', label: 'Auto Select Decoder' } ]; const getProgressKey = (key) => key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')).join('/'); const createStreamSessionId = () => `${Date.now()}-${Math.random().toString(36).slice(2, 10)}`; const addWsClient = (progressKey, ws) => { if (!wsSubscriptions.has(progressKey)) { wsSubscriptions.set(progressKey, new Set()); } wsSubscriptions.get(progressKey).add(ws); ws.currentKey = progressKey; }; const removeWsClient = (ws) => { const key = ws.currentKey; if (!key) return; const set = wsSubscriptions.get(key); if (!set) return; set.delete(ws); if (set.size === 0) wsSubscriptions.delete(key); }; const broadcastWs = (key, payload) => { const clients = wsSubscriptions.get(key); if (!clients) return; const message = JSON.stringify(payload); for (const client of clients) { if (client.readyState === WebSocket.OPEN) { client.send(message); } } }; const createFfmpegOptions = (encoderName) => { const options = []; if (encoderName === 'libx264' || encoderName === 'libx265') { options.push('-preset', 'fast', '-crf', '23', '-threads', '0'); } else if (/_nvenc$/.test(encoderName)) { options.push('-preset', 'fast', '-rc:v', 'vbr_hq', '-cq', '19'); } else if (/_qsv$/.test(encoderName)) { options.push('-preset', 'fast', '-global_quality', '23'); } else if (/_vaapi$/.test(encoderName)) { options.push('-qp', '23'); } else if (/_rkmpp$/.test(encoderName)) { options.push('-qp_init', '23', '-pix_fmt', 'nv12'); } return options; }; const isRkmppCodec = (codecName) => /_rkmpp$/.test(codecName); const isVaapiCodec = (codecName) => /_vaapi$/.test(codecName); const availableEncoderValues = new Set(AVAILABLE_VIDEO_ENCODERS.map((item) => item.value)); const availableDecoderValues = new Set(AVAILABLE_VIDEO_DECODERS.map((item) => item.value)); const getRkmppDecoderName = (metadata) => { const videoStream = (metadata?.streams || []).find((stream) => stream.codec_type === 'video'); const codecName = (videoStream?.codec_name || '').toLowerCase(); const decoderMap = { av1: 'av1_rkmpp', h263: 'h263_rkmpp', h264: 'h264_rkmpp', hevc: 'hevc_rkmpp', mjpeg: 'mjpeg_rkmpp', mpeg1video: 'mpeg1_rkmpp', mpeg2video: 'mpeg2_rkmpp', mpeg4: 'mpeg4_rkmpp', vp8: 'vp8_rkmpp', vp9: 'vp9_rkmpp' }; return decoderMap[codecName] || null; }; const parseFpsValue = (fpsText) => { if (typeof fpsText !== 'string' || !fpsText.trim()) { return 0; } if (fpsText.includes('/')) { const [numeratorText, denominatorText] = fpsText.split('/'); const numerator = parseFloat(numeratorText); const denominator = parseFloat(denominatorText); if (Number.isFinite(numerator) && Number.isFinite(denominator) && denominator !== 0) { return numerator / denominator; } return 0; } const numericValue = parseFloat(fpsText); return Number.isFinite(numericValue) ? numericValue : 0; }; const getSeekFriendlyOutputOptions = (encoderName, metadata) => { const videoStream = (metadata?.streams || []).find((stream) => stream.codec_type === 'video') || {}; const parsedFps = parseFpsValue(videoStream.avg_frame_rate) || parseFpsValue(videoStream.r_frame_rate) || 24; const normalizedFps = Math.min(Math.max(Math.round(parsedFps), 12), 60); const gopSize = Math.max(24, normalizedFps * 2); const options = [ '-movflags', 'frag_keyframe+empty_moov+default_base_moof+faststart', '-frag_duration', '1000000', '-min_frag_duration', '1000000', '-g', gopSize.toString(), '-keyint_min', gopSize.toString() ]; if (encoderName === 'libx264' || encoderName === 'libx265') { options.push('-sc_threshold', '0', '-force_key_frames', 'expr:gte(t,n_forced*2)'); } else if (/_nvenc$/.test(encoderName)) { options.push('-forced-idr', '1', '-force_key_frames', 'expr:gte(t,n_forced*2)'); } else if (/_qsv$/.test(encoderName)) { options.push('-idr_interval', '1'); } else if (/_rkmpp$/.test(encoderName)) { options.push('-force_key_frames', 'expr:gte(t,n_forced*2)'); } return options; }; const shouldRetryWithSoftware = (message) => { if (!message) return false; return /Cannot load libcuda\.so\.1|Could not open encoder before EOF|Error while opening encoder|Operation not permitted|Invalid argument|mpp_create|rkmpp/i.test(message); }; const probeFile = (filePath) => { return new Promise((resolve, reject) => { ffmpeg.ffprobe(filePath, (err, metadata) => { if (err) reject(err); else resolve(metadata); }); }); }; const parseTimemarkToSeconds = (timemark) => { if (typeof timemark !== 'string' || !timemark.trim()) { return 0; } const parts = timemark.trim().split(':'); if (parts.length !== 3) { const numericValue = parseFloat(timemark); return Number.isFinite(numericValue) ? numericValue : 0; } const [hoursPart, minutesPart, secondsPart] = parts; const hours = parseInt(hoursPart, 10); const minutes = parseInt(minutesPart, 10); const seconds = parseFloat(secondsPart); if (![hours, minutes, seconds].every(Number.isFinite)) { return 0; } return (hours * 3600) + (minutes * 60) + seconds; }; const stopActiveTranscode = (progressKey) => { const activeProcess = transcodeProcesses.get(progressKey); if (!activeProcess?.command) { return false; } try { if (typeof activeProcess.command.removeAllListeners === 'function') { activeProcess.command.removeAllListeners('progress'); activeProcess.command.removeAllListeners('stderr'); activeProcess.command.removeAllListeners('end'); activeProcess.command.removeAllListeners('error'); } if (typeof activeProcess.command.kill === 'function') { activeProcess.command.kill('SIGKILL'); } } catch (killError) { console.warn(`Failed to kill transcode process for ${progressKey}:`, killError); } finally { transcodeProcesses.delete(progressKey); } return true; }; const extractS3Credentials = async (req) => { const query = req.query || {}; const sessionId = req.headers['x-session-id'] || req.body?.sessionId || query.sessionId || ''; if (sessionId) { try { const cachedCreds = await redisClient.get(`session:${sessionId}`); if (cachedCreds) { const creds = JSON.parse(cachedCreds); return { username: creds.username, password: creds.password }; } } catch (e) { console.error('Session retrieval error:', e); } } const username = req.headers['x-s3-username'] || req.body?.username || query.username || query.accessKeyId || ''; const password = req.headers['x-s3-password'] || req.body?.password || query.password || query.secretAccessKey || ''; return { username: typeof username === 'string' ? username.trim() : '', password: typeof password === 'string' ? password : '' }; }; const wss = new WebSocket.Server({ server }); wss.on('connection', (ws) => { ws.on('message', (raw) => { try { const message = JSON.parse(raw.toString()); if (message.type === 'subscribe' && typeof message.key === 'string') { const progressKey = getProgressKey(message.key); if (ws.currentKey && ws.currentKey !== progressKey) { removeWsClient(ws); } addWsClient(progressKey, ws); const currentProgress = progressMap[progressKey]; if (currentProgress) { if (typeof currentProgress.duration === 'number' && currentProgress.duration > 0) { ws.send(JSON.stringify({ type: 'duration', key: message.key, duration: currentProgress.duration })); } ws.send(JSON.stringify({ type: 'progress', key: message.key, progress: currentProgress })); if (currentProgress.status === 'finished' && currentProgress.mp4Url) { ws.send(JSON.stringify({ type: 'ready', key: message.key, mp4Url: currentProgress.mp4Url })); } } } } catch (error) { console.error('WebSocket parse error:', error); } }); ws.on('close', () => removeWsClient(ws)); }); const ensureS3Downloaded = async (s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId) => { if (activeDownloads.has(progressKey)) { try { await activeDownloads.get(progressKey); } catch (err) { // Ignore error and retry if previous failed } } let shouldDownload = true; let s3Metadata = null; if (fs.existsSync(tmpInputPath)) { const stats = fs.statSync(tmpInputPath); const localSize = stats.size; try { const headCommand = new HeadObjectCommand({ Bucket: bucket, Key: key }); s3Metadata = await s3Client.send(headCommand); if (s3Metadata.ContentLength === localSize) { shouldDownload = false; console.log(`[Cache] Verified ${key}: Local size matches S3 (${localSize} bytes).`); } else { console.log(`[Cache] Mismatch for ${key}: Local ${localSize} vs S3 ${s3Metadata.ContentLength}. Re-downloading.`); } } catch (err) { console.error(`[Cache] Failed to verify S3 metadata for ${key}:`, err.message); } } if (!shouldDownload) { progressMap[progressKey] = { status: 'downloaded', percent: 100, downloadedBytes: s3Metadata.ContentLength, totalBytes: s3Metadata.ContentLength, streamSessionId, details: 'Source cached locally and verified against S3...', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); return; } const downloadPromise = (async () => { const command = new GetObjectCommand({ Bucket: bucket, Key: key }); const response = await s3Client.send(command); const totalBytes = response.ContentLength || 0; const s3Stream = response.Body; let downloadedBytes = 0; const downloadingPath = tmpInputPath + '.downloading'; progressMap[progressKey] = { status: 'downloading', percent: 0, downloadedBytes: 0, totalBytes, streamSessionId, details: 'Downloading full source...', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); await new Promise((resolve, reject) => { const writeStream = fs.createWriteStream(downloadingPath); s3Stream.on('data', (chunk) => { downloadedBytes += chunk.length; const percent = totalBytes ? Math.min(100, Math.round((downloadedBytes / totalBytes) * 100)) : 0; const downloadState = { status: 'downloading', percent, downloadedBytes, totalBytes, streamSessionId, details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...', mp4Url: null }; progressMap[progressKey] = downloadState; broadcastWs(progressKey, { type: 'progress', key, progress: downloadState }); }); s3Stream.on('error', reject); writeStream.on('error', reject); writeStream.on('finish', () => { try { fs.renameSync(downloadingPath, tmpInputPath); resolve(); } catch (e) { reject(e); } }); s3Stream.pipe(writeStream); }); progressMap[progressKey] = { status: 'downloaded', percent: 100, downloadedBytes, totalBytes, streamSessionId, details: 'Source download complete...', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); })(); activeDownloads.set(progressKey, downloadPromise); try { await downloadPromise; } catch (err) { if (fs.existsSync(tmpInputPath + '.downloading')) { fs.rmSync(tmpInputPath + '.downloading', { force: true }); } throw err; } finally { activeDownloads.delete(progressKey); } }; app.get('/api/buckets', async (req, res) => { try { const auth = await extractS3Credentials(req); const s3Client = createS3Client(auth); const command = new ListBucketsCommand({}); const response = await s3Client.send(command); const buckets = response.Buckets || []; res.json({ buckets }); } catch (error) { console.error('Error listing buckets:', error); res.status(500).json({ error: 'Failed to list buckets', detail: error.message }); } }); // Endpoint to list videos in the bucket app.get('/api/videos', async (req, res) => { try { const bucket = req.query.bucket || BUCKET_NAME; if (!bucket) { return res.status(400).json({ error: 'Bucket name is required' }); } const allObjects = []; const auth = await extractS3Credentials(req); const s3Client = createS3Client(auth); let continuationToken; do { const command = new ListObjectsV2Command({ Bucket: bucket, ContinuationToken: continuationToken, }); const response = await s3Client.send(command); allObjects.push(...(response.Contents || [])); continuationToken = response.IsTruncated ? response.NextContinuationToken : undefined; } while (continuationToken); // Filter for a broader set of common video formats const videoExtensions = [ '.3gp', '.3g2', '.asf', '.avi', '.divx', '.flv', '.m2ts', '.m2v', '.m4v', '.mkv', '.mov', '.mp4', '.mpeg', '.mpg', '.mts', '.mxf', '.ogm', '.ogv', '.qt', '.rm', '.rmvb', '.ts', '.vob', '.vro', '.webm', '.wmv' ]; const videos = allObjects .map(item => item.Key) .filter(key => { if (!key) return false; const lowerKey = key.toLowerCase(); return videoExtensions.some(ext => lowerKey.endsWith(ext)); }) .map(key => { const safeBucket = bucket.replace(/[^a-z0-9]/gi, '_'); const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-z0-9]/gi, '_')); const hlsDir = path.join(CACHE_DIR, `hls-${safeBucket}-${safeKeySegments.join('-')}`); return { key: key, hasTranscodeCache: fs.existsSync(hlsDir) }; }); res.json({ videos }); } catch (error) { console.error('Error fetching videos:', error); res.status(500).json({ error: 'Failed to fetch videos from S3', detail: error.message }); } }); app.post('/api/login', async (req, res) => { const { username, password } = req.body; if (!username || !password) return res.status(400).json({ error: 'Missing credentials' }); try { const s3Client = createS3Client({ username, password }); const command = new ListBucketsCommand({}); const response = await s3Client.send(command); const buckets = response.Buckets || []; const sessionId = crypto.randomBytes(32).toString('hex'); await redisClient.set(`session:${sessionId}`, JSON.stringify({ username, password }), 'EX', 7 * 24 * 3600); res.json({ success: true, sessionId, username, buckets }); } catch (error) { console.error('Login error:', error); res.status(401).json({ error: 'Login failed', detail: error.message }); } }); app.post('/api/logout', async (req, res) => { const sessionId = req.headers['x-session-id'] || req.body?.sessionId; if (sessionId) { try { await redisClient.del(`session:${sessionId}`); } catch (e) { } } res.json({ success: true }); }); app.get('/api/config', (req, res) => { const title = process.env.APP_TITLE || 'S3 Media Transcoder'; res.json({ title, ffmpegPath: JELLYFIN_FFMPEG_PATH, ffprobePath: JELLYFIN_FFPROBE_PATH, defaultVideoEncoder: 'h264_rkmpp', defaultVideoDecoder: 'auto', videoEncoders: AVAILABLE_VIDEO_ENCODERS, videoDecoders: AVAILABLE_VIDEO_DECODERS }); }); app.post('/api/clear-video-transcode-cache', async (req, res) => { try { const { bucket, key } = req.body; if (!bucket || !key) { return res.status(400).json({ error: 'Bucket and key are required' }); } const safeBucket = bucket.replace(/[^a-z0-9]/gi, '_'); const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-z0-9]/gi, '_')); const hlsDir = path.join(CACHE_DIR, `hls-${safeBucket}-${safeKeySegments.join('-')}`); if (fs.existsSync(hlsDir)) { fs.rmSync(hlsDir, { recursive: true, force: true }); } res.json({ message: 'Transcode cache cleared for video' }); } catch (error) { console.error('Error clearing video transcode cache:', error); res.status(500).json({ error: 'Failed to clear transcode cache', detail: error.message }); } }); app.post('/api/stop-transcode', (req, res) => { try { const { key } = req.body; if (!key) { return res.status(400).json({ error: 'Video key is required' }); } const progressKey = getProgressKey(key); if (!transcodeProcesses.has(progressKey)) { return res.status(404).json({ error: 'No active transcode found for this key' }); } stopActiveTranscode(progressKey); progressMap[progressKey] = { status: 'cancelled', percent: 0, details: 'Transcode stopped by user', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); res.json({ message: 'Transcode stopped' }); } catch (error) { console.error('Error stopping transcode:', error); res.status(500).json({ error: 'Failed to stop transcode', detail: error.message }); } }); const HLS_SEGMENT_TIME = 6; const waitForSegment = async (hlsDir, segIndex, timeoutMs = 45000) => { const start = Date.now(); const segPath = path.join(hlsDir, `segment_${segIndex}.ts`); const m3u8Path = path.join(hlsDir, `temp.m3u8`); while (Date.now() - start < timeoutMs) { if (fs.existsSync(m3u8Path)) { const m3u8Content = fs.readFileSync(m3u8Path, 'utf8'); if (m3u8Content.includes(`segment_${segIndex}.ts`)) { return true; } if (m3u8Content.includes(`#EXT-X-ENDLIST`)) { if (fs.existsSync(segPath)) return true; return false; } } await new Promise(r => setTimeout(r, 200)); } return false; }; app.get('/api/hls/playlist.m3u8', async (req, res) => { const bucket = req.query.bucket; const key = req.query.key; if (!bucket || !key) return res.status(400).send('Bad Request'); const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')); const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_'); const tmpInputPath = path.join(CACHE_DIR, `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`); const auth = await extractS3Credentials(req); const s3Client = createS3Client(auth); const progressKey = getProgressKey(key); const streamSessionId = createStreamSessionId(); try { await ensureS3Downloaded(s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId); } catch (err) { console.error('S3 Download Failed:', err); return res.status(500).send('S3 Download Failed'); } let duration = 0; try { const metadata = await probeFile(tmpInputPath); duration = parseFloat(metadata.format?.duration || 0); } catch (err) { } if (duration <= 0) duration = 3600; const totalSegments = Math.ceil(duration / HLS_SEGMENT_TIME); let m3u8 = `#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:${HLS_SEGMENT_TIME}\n#EXT-X-MEDIA-SEQUENCE:0\n#EXT-X-PLAYLIST-TYPE:VOD\n`; for (let i = 0; i < totalSegments; i++) { let segDur = HLS_SEGMENT_TIME; if (i === totalSegments - 1 && duration % HLS_SEGMENT_TIME !== 0) { segDur = (duration % HLS_SEGMENT_TIME) || HLS_SEGMENT_TIME; } m3u8 += `#EXTINF:${segDur.toFixed(6)},\nsegment.ts?bucket=${encodeURIComponent(bucket)}&key=${encodeURIComponent(key)}&seg=${i}&encoder=${req.query.encoder || 'h264_rkmpp'}&decoder=${req.query.decoder || 'auto'}\n`; } m3u8 += `#EXT-X-ENDLIST\n`; res.setHeader('Content-Type', 'application/vnd.apple.mpegurl'); res.setHeader('Cache-Control', 'no-cache'); res.send(m3u8); }); const hlsProcesses = new Map(); // Watchdog: Kill HLS transcoding if the frontend stops requesting segments setInterval(() => { const now = Date.now(); for (const [key, processInfo] of hlsProcesses.entries()) { if (processInfo.lastActive && now - processInfo.lastActive > 30000) { try { if (processInfo.command) { processInfo.command.kill('SIGKILL'); } } catch (e) { console.warn(`Failed to kill inactive HLS process for ${key}:`, e); } hlsProcesses.delete(key); console.log(`[Watchdog] Terminated inactive HLS transcode for ${key}`); } } }, 10000); app.get('/api/hls/segment.ts', async (req, res) => { const bucket = req.query.bucket; const key = req.query.key; const seg = parseInt(req.query.seg || '0'); const requestedEncoder = req.query.encoder || 'h264_rkmpp'; const requestedDecoder = req.query.decoder || 'auto'; if (!bucket || !key || isNaN(seg)) return res.status(400).send('Bad Request'); const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')); const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_'); const tmpInputPath = path.join(CACHE_DIR, `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`); const progressKey = safeKeySegments.join('/'); const hlsDir = path.join(CACHE_DIR, `hls-${safeBucket}-${progressKey}`); if (!fs.existsSync(hlsDir)) fs.mkdirSync(hlsDir, { recursive: true }); const targetSegPath = path.join(hlsDir, `segment_${seg}.ts`); let currentProcess = hlsProcesses.get(progressKey); if (currentProcess) { currentProcess.lastActive = Date.now(); } const checkIsCachedAndCompleted = () => { if (!fs.existsSync(targetSegPath)) return false; const m3u8Path = path.join(hlsDir, `temp.m3u8`); if (fs.existsSync(m3u8Path) && fs.readFileSync(m3u8Path, 'utf8').includes(`segment_${seg}.ts`)) return true; if (currentProcess && Math.abs((currentProcess.currentSeg || 0) - seg) > 3) return true; // If there's no active process, any existing file is from a past complete run if (!currentProcess) return true; return false; }; if (checkIsCachedAndCompleted()) { if (currentProcess) currentProcess.currentSeg = Math.max(currentProcess.currentSeg, seg); res.setHeader('Content-Type', 'video/MP2T'); return res.sendFile(targetSegPath); } const needsNewProcess = !currentProcess || (!fs.existsSync(targetSegPath) && (seg < (currentProcess.currentSeg || 0) || seg > (currentProcess.currentSeg || 0) + 4)); if (needsNewProcess) { if (currentProcess && currentProcess.command) { try { currentProcess.command.kill('SIGKILL'); } catch (e) { } } const startTime = Math.max(0, seg * HLS_SEGMENT_TIME); let sourceMetadata = null; try { sourceMetadata = await probeFile(tmpInputPath); } catch (e) { } const encoderName = availableEncoderValues.has(requestedEncoder) ? requestedEncoder : 'h264_rkmpp'; const decoderName = availableDecoderValues.has(requestedDecoder) ? requestedDecoder : 'auto'; const m3u8Path = path.join(hlsDir, `temp.m3u8`); if (fs.existsSync(m3u8Path)) fs.unlinkSync(m3u8Path); const ffmpegCommand = ffmpeg().input(tmpInputPath); if (startTime > 0) ffmpegCommand.seekInput(startTime); ffmpegCommand.videoCodec(encoderName).audioCodec('aac'); if (isVaapiCodec(encoderName)) { ffmpegCommand.inputOptions(['-vaapi_device', '/dev/dri/renderD128']).videoFilters('format=nv12,hwupload'); } const resolvedDecoderName = decoderName === 'auto' && isRkmppCodec(encoderName) ? getRkmppDecoderName(sourceMetadata) : decoderName; if (resolvedDecoderName && resolvedDecoderName !== 'auto') ffmpegCommand.inputOptions(['-c:v', resolvedDecoderName]); const segmentFilename = path.join(hlsDir, `segment_%d.ts`); const hlsOptions = createFfmpegOptions(encoderName).concat([ '-f', 'hls', '-hls_time', HLS_SEGMENT_TIME.toString(), '-hls_list_size', '0', '-hls_segment_filename', segmentFilename, '-start_number', seg.toString(), '-copyts', '-avoid_negative_ts', 'disabled', '-muxdelay', '0', '-muxpreload', '0' ]); ffmpegCommand.outputOptions(hlsOptions).output(m3u8Path); ffmpegCommand.on('error', (err) => { console.error('HLS FFmpeg Error:', err.message); }); ffmpegCommand.on('progress', (progress) => { const timemarkSeconds = parseTimemarkToSeconds(progress.timemark || '0'); const absoluteSeconds = startTime + (isFinite(timemarkSeconds) ? timemarkSeconds : 0); const totalDuration = parseFloat(sourceMetadata?.format?.duration || 0); let percent = 0; if (totalDuration > 0) { percent = Math.min(Math.max(Math.round((absoluteSeconds / totalDuration) * 100), 0), 100); } const progressState = { status: 'transcoding', percent, frame: progress.frames || null, fps: progress.currentFps || null, bitrate: progress.currentKbps || null, timemark: progress.timemark || null, absoluteSeconds, duration: totalDuration || null, startSeconds: startTime, details: `处理进度 ${percent}%`, mp4Url: null }; progressMap[progressKey] = progressState; broadcastWs(progressKey, { type: 'progress', key, progress: progressState }); console.log(`[FFmpeg] ${progressKey} | ${progress.timemark} | ${progress.currentFps}fps | ${progress.currentKbps}kbps | ${percent}%`); }); ffmpegCommand.on('end', () => { console.log(`[FFmpeg] ${progressKey} HLS transcode completed.`); }); ffmpegCommand.run(); currentProcess = { command: ffmpegCommand, currentSeg: seg, lastActive: Date.now() }; hlsProcesses.set(progressKey, currentProcess); } const ready = await waitForSegment(hlsDir, seg); if (!ready) { return res.status(500).send('Segment generation timeout'); } if (currentProcess) { currentProcess.currentSeg = Math.max(currentProcess.currentSeg, seg); currentProcess.lastActive = Date.now(); } res.setHeader('Content-Type', 'video/MP2T'); res.sendFile(targetSegPath); }); app.get('/api/stream', async (req, res) => { const bucket = req.query.bucket; const key = req.query.key; const requestedDecoder = typeof req.query.decoder === 'string' ? req.query.decoder.trim() : 'auto'; const requestedEncoder = typeof req.query.encoder === 'string' ? req.query.encoder.trim() : 'h264_rkmpp'; const startSeconds = parseFloat(req.query.ss) || 0; const streamSessionId = typeof req.query.streamSessionId === 'string' && req.query.streamSessionId.trim() ? req.query.streamSessionId.trim() : createStreamSessionId(); if (!bucket) { return res.status(400).json({ error: 'Bucket name is required' }); } if (!key) { return res.status(400).json({ error: 'Video key is required' }); } const videoEncoder = availableEncoderValues.has(requestedEncoder) ? requestedEncoder : 'h264_rkmpp'; const requestedVideoDecoder = availableDecoderValues.has(requestedDecoder) ? requestedDecoder : 'auto'; const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')); const progressKey = safeKeySegments.join('/'); const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_'); const tmpInputPath = path.join(CACHE_DIR, `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`); const cacheExists = fs.existsSync(tmpInputPath); const auth = await extractS3Credentials(req); const s3Client = createS3Client(auth); try { const replacedExistingStream = stopActiveTranscode(progressKey); if (replacedExistingStream && startSeconds > 0) { progressMap[progressKey] = { ...(progressMap[progressKey] || {}), status: 'transcoding', percent: Math.min(Math.max(Math.round((startSeconds / Math.max(progressMap[progressKey]?.duration || startSeconds || 1, 1)) * 100), 0), 100), startSeconds, streamSessionId, details: `Restarting transcode from ${startSeconds.toFixed(2)}s`, mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); } await ensureS3Downloaded(s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId); // Probe file for duration and broadcast to clients let sourceMetadata = null; try { const metadata = await probeFile(tmpInputPath); sourceMetadata = metadata; const duration = metadata.format?.duration || 0; progressMap[progressKey] = { ...(progressMap[progressKey] || {}), duration: parseFloat(duration) || 0, streamSessionId }; broadcastWs(progressKey, { type: 'duration', key, duration: parseFloat(duration) }); } catch (probeErr) { console.error('Probe failed:', probeErr); } res.setHeader('Content-Type', 'video/mp4'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); let ffmpegCommand = null; const startStream = (encoderName, decoderName) => { const streamingOptions = createFfmpegOptions(encoderName).concat(getSeekFriendlyOutputOptions(encoderName, sourceMetadata)); ffmpegCommand = ffmpeg() .input(tmpInputPath) .videoCodec(encoderName) .audioCodec('aac') .outputOptions(streamingOptions) .format('mp4'); if (startSeconds > 0) { if (typeof ffmpegCommand.seekInput === 'function') { ffmpegCommand.seekInput(startSeconds); } else { ffmpegCommand.inputOptions(['-ss', startSeconds.toString()]); } } if (isVaapiCodec(encoderName)) { ffmpegCommand .inputOptions(['-vaapi_device', '/dev/dri/renderD128']) .videoFilters('format=nv12,hwupload'); } const resolvedDecoderName = decoderName === 'auto' && isRkmppCodec(encoderName) ? getRkmppDecoderName(sourceMetadata) : decoderName; if (resolvedDecoderName && resolvedDecoderName !== 'auto') { ffmpegCommand.inputOptions(['-c:v', resolvedDecoderName]); } transcodeProcesses.set(progressKey, { command: ffmpegCommand, streamSessionId }); ffmpegCommand .on('progress', (progress) => { if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) { return; } const effectiveDuration = Math.max(0, (progressMap[progressKey]?.duration || 0) - startSeconds); const timemarkSeconds = parseTimemarkToSeconds(progress.timemark || '0'); const absoluteSeconds = startSeconds + (isFinite(timemarkSeconds) ? timemarkSeconds : 0); const percent = effectiveDuration > 0 ? Math.min(Math.max(Math.round((absoluteSeconds / (progressMap[progressKey]?.duration || effectiveDuration)) * 100), 0), 100) : Math.min(Math.max(Math.round(progress.percent || 0), 0), 100); const progressState = { status: 'transcoding', percent, frame: progress.frames || null, fps: progress.currentFps || null, bitrate: progress.currentKbps || null, timemark: progress.timemark || null, absoluteSeconds, duration: progressMap[progressKey]?.duration || null, startSeconds, streamSessionId, details: `Streaming transcode ${percent}%`, mp4Url: null }; progressMap[progressKey] = progressState; broadcastWs(progressKey, { type: 'progress', key, progress: progressState }); }) .on('stderr', (stderrLine) => { if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) { return; } console.log(`ffmpeg stderr: ${stderrLine}`); }) .on('end', () => { if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) { return; } transcodeProcesses.delete(progressKey); progressMap[progressKey] = { status: 'finished', percent: 100, duration: progressMap[progressKey]?.duration || null, startSeconds, streamSessionId, details: 'Streaming transcode complete', mp4Url: null }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); }) .on('error', (err) => { if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) { return; } transcodeProcesses.delete(progressKey); const failedState = { status: 'failed', percent: progressMap[progressKey]?.percent || 0, duration: progressMap[progressKey]?.duration || null, startSeconds, streamSessionId, details: err.message || 'Streaming transcode failed', mp4Url: null }; progressMap[progressKey] = failedState; broadcastWs(progressKey, { type: 'progress', key, progress: failedState }); if (!res.headersSent) { res.status(500).json({ error: 'Failed to stream transcoded video', detail: err.message }); } else { res.destroy(err); } }); ffmpegCommand.pipe(res, { end: true }); }; res.on('close', () => { if (ffmpegCommand && typeof ffmpegCommand.kill === 'function') { try { ffmpegCommand.kill('SIGKILL'); } catch (_) { } } if (transcodeProcesses.get(progressKey)?.streamSessionId === streamSessionId) { transcodeProcesses.delete(progressKey); } }); startStream(videoEncoder, requestedVideoDecoder); } catch (error) { console.error('Error in stream:', error); if (!res.headersSent) { res.status(500).json({ error: 'Failed to stream video', detail: error.message }); } else { res.destroy(error); } } }); server.listen(PORT, HOST, () => { console.log(`Server running on http://${HOST}:${PORT}`); });