Files
Media-Coding-Web/server.js
CN-JS-HuiBai 3b65c306de 优化布局
2026-04-04 14:02:49 +08:00

1117 lines
44 KiB
JavaScript

const express = require('express');
const cors = require('cors');
const dotenv = require('dotenv');
const fs = require('fs');
const os = require('os');
const path = require('path');
const http = require('http');
const WebSocket = require('ws');
const ffmpeg = require('fluent-ffmpeg');
const { S3Client, ListBucketsCommand, ListObjectsV2Command, GetObjectCommand, HeadObjectCommand } = require('@aws-sdk/client-s3');
const crypto = require('crypto');
const Redis = require('ioredis');
dotenv.config();
const winston = require('winston');
require('winston-daily-rotate-file');
const util = require('util');
const LOGS_DIR = path.join(__dirname, 'logs');
if (!fs.existsSync(LOGS_DIR)) {
fs.mkdirSync(LOGS_DIR, { recursive: true });
}
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
winston.format.errors({ stack: true }),
winston.format.splat(),
winston.format.printf(({ timestamp, level, message, stack, ...meta }) => {
return `[${timestamp}] ${level.toUpperCase()}: ${stack || message} ${Object.keys(meta).length ? JSON.stringify(meta) : ''}`;
})
),
transports: [
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.printf(({ timestamp, level, message, stack }) => {
return `[${timestamp}] ${level}: ${stack || message}`;
})
)
}),
new winston.transports.DailyRotateFile({
filename: path.join(LOGS_DIR, 'application-%DATE%.log'),
datePattern: 'YYYY-MM-DD',
zippedArchive: true,
maxSize: '20m',
maxFiles: '14d'
}),
new winston.transports.DailyRotateFile({
filename: path.join(LOGS_DIR, 'error-%DATE%.log'),
datePattern: 'YYYY-MM-DD',
level: 'error',
zippedArchive: true,
maxSize: '20m',
maxFiles: '30d'
})
]
});
console.log = function(...args) { logger.info(util.format(...args)); };
console.info = function(...args) { logger.info(util.format(...args)); };
console.warn = function(...args) { logger.warn(util.format(...args)); };
console.error = function(...args) { logger.error(util.format(...args)); };
process.on('uncaughtException', (err) => {
logger.error(`Uncaught Exception: ${err.message}`, err);
});
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', promise, 'reason:', reason);
});
const CACHE_DIR = path.join(__dirname, 'cache');
if (!fs.existsSync(CACHE_DIR)) {
fs.mkdirSync(CACHE_DIR, { recursive: true });
}
const app = express();
const PORT = process.env.PORT || 3000;
const HOST = process.env.HOST || process.env.LISTEN_ADDRESS || '0.0.0.0';
const server = http.createServer(app);
const JELLYFIN_FFMPEG_PATH = process.env.JELLYFIN_FFMPEG_PATH || '/usr/lib/jellyfin-ffmpeg/ffmpeg';
const JELLYFIN_FFPROBE_PATH = process.env.JELLYFIN_FFPROBE_PATH || '/usr/lib/jellyfin-ffmpeg/ffprobe';
if (typeof ffmpeg.setFfmpegPath === 'function') {
ffmpeg.setFfmpegPath(JELLYFIN_FFMPEG_PATH);
}
if (typeof ffmpeg.setFfprobePath === 'function') {
ffmpeg.setFfprobePath(JELLYFIN_FFPROBE_PATH);
}
const redisUrl = process.env.VALKEY_URL || process.env.REDIS_URL || 'redis://localhost:6379';
const redisDb = parseInt(process.env.VALKEY_DB || process.env.REDIS_DB || '0', 10);
const redisPrefix = process.env.VALKEY_PREFIX || process.env.REDIS_PREFIX || '';
const redisClient = new Redis(redisUrl, {
db: isNaN(redisDb) ? 0 : redisDb,
keyPrefix: redisPrefix ? (redisPrefix.endsWith(':') ? redisPrefix : redisPrefix + ':') : ''
});
app.use(cors());
app.use(express.json());
app.use(express.static('public'));
const rawBucketAddress = process.env.S3_BUCKET_ADDRESS || process.env.S3_BUCKET_NAME || '';
let BUCKET_NAME = rawBucketAddress;
const parsedBucketUrl = rawBucketAddress.includes('://') ? (() => {
try {
const parsed = new URL(rawBucketAddress);
const name = parsed.pathname.replace(/^\/+/, '');
if (name) {
BUCKET_NAME = name;
}
return parsed.origin;
} catch (err) {
return undefined;
}
})() : undefined;
const defaultS3ClientConfig = {
region: process.env.AWS_REGION || 'us-east-1',
endpoint: process.env.S3_ENDPOINT || parsedBucketUrl,
forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true'
};
const createS3Client = (credentials) => {
const clientConfig = { ...defaultS3ClientConfig };
if (credentials && credentials.username && credentials.password) {
clientConfig.credentials = {
accessKeyId: credentials.username,
secretAccessKey: credentials.password
};
} else if (process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY) {
clientConfig.credentials = {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
};
}
return new S3Client(clientConfig);
};
const progressMap = {};
const transcodeProcesses = new Map();
const wsSubscriptions = new Map();
const activeDownloads = new Map();
const AVAILABLE_VIDEO_ENCODERS = [
{ value: 'h264_rkmpp', label: 'H.264(RKMPP HighSpeed)' },
{ value: 'hevc_rkmpp', label: 'H.265(RKMPP HighSpeed)' },
{ value: 'libx264', label: 'H.264(Software Slow)' },
{ value: 'libx265', label: 'H.265(Software Slow)' }
];
const AVAILABLE_VIDEO_DECODERS = [
{ value: 'auto', label: 'Auto Select Decoder' }
];
const getProgressKey = (key) => key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')).join('/');
const createStreamSessionId = () => `${Date.now()}-${Math.random().toString(36).slice(2, 10)}`;
const addWsClient = (progressKey, ws) => {
if (!wsSubscriptions.has(progressKey)) {
wsSubscriptions.set(progressKey, new Set());
}
wsSubscriptions.get(progressKey).add(ws);
ws.currentKey = progressKey;
};
const removeWsClient = (ws) => {
const key = ws.currentKey;
if (!key) return;
const set = wsSubscriptions.get(key);
if (!set) return;
set.delete(ws);
if (set.size === 0) wsSubscriptions.delete(key);
};
const broadcastWs = (key, payload) => {
const clients = wsSubscriptions.get(key);
if (!clients) return;
const message = JSON.stringify(payload);
for (const client of clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
}
};
const createFfmpegOptions = (encoderName) => {
const options = [];
if (encoderName === 'libx264' || encoderName === 'libx265') {
options.push('-preset', 'fast', '-crf', '23', '-threads', '0');
} else if (/_nvenc$/.test(encoderName)) {
options.push('-preset', 'fast', '-rc:v', 'vbr_hq', '-cq', '19');
} else if (/_qsv$/.test(encoderName)) {
options.push('-preset', 'fast', '-global_quality', '23');
} else if (/_vaapi$/.test(encoderName)) {
options.push('-qp', '23');
} else if (/_rkmpp$/.test(encoderName)) {
options.push('-qp_init', '23', '-pix_fmt', 'nv12');
}
return options;
};
const isRkmppCodec = (codecName) => /_rkmpp$/.test(codecName);
const isVaapiCodec = (codecName) => /_vaapi$/.test(codecName);
const availableEncoderValues = new Set(AVAILABLE_VIDEO_ENCODERS.map((item) => item.value));
const availableDecoderValues = new Set(AVAILABLE_VIDEO_DECODERS.map((item) => item.value));
const getRkmppDecoderName = (metadata) => {
const videoStream = (metadata?.streams || []).find((stream) => stream.codec_type === 'video');
const codecName = (videoStream?.codec_name || '').toLowerCase();
const decoderMap = {
av1: 'av1_rkmpp',
h263: 'h263_rkmpp',
h264: 'h264_rkmpp',
hevc: 'hevc_rkmpp',
mjpeg: 'mjpeg_rkmpp',
mpeg1video: 'mpeg1_rkmpp',
mpeg2video: 'mpeg2_rkmpp',
mpeg4: 'mpeg4_rkmpp',
vp8: 'vp8_rkmpp',
vp9: 'vp9_rkmpp'
};
return decoderMap[codecName] || null;
};
const parseFpsValue = (fpsText) => {
if (typeof fpsText !== 'string' || !fpsText.trim()) {
return 0;
}
if (fpsText.includes('/')) {
const [numeratorText, denominatorText] = fpsText.split('/');
const numerator = parseFloat(numeratorText);
const denominator = parseFloat(denominatorText);
if (Number.isFinite(numerator) && Number.isFinite(denominator) && denominator !== 0) {
return numerator / denominator;
}
return 0;
}
const numericValue = parseFloat(fpsText);
return Number.isFinite(numericValue) ? numericValue : 0;
};
const getSeekFriendlyOutputOptions = (encoderName, metadata) => {
const videoStream = (metadata?.streams || []).find((stream) => stream.codec_type === 'video') || {};
const parsedFps = parseFpsValue(videoStream.avg_frame_rate) || parseFpsValue(videoStream.r_frame_rate) || 24;
const normalizedFps = Math.min(Math.max(Math.round(parsedFps), 12), 60);
const gopSize = Math.max(24, normalizedFps * 2);
const options = [
'-movflags', 'frag_keyframe+empty_moov+default_base_moof+faststart',
'-frag_duration', '1000000',
'-min_frag_duration', '1000000',
'-g', gopSize.toString(),
'-keyint_min', gopSize.toString()
];
if (encoderName === 'libx264' || encoderName === 'libx265') {
options.push('-sc_threshold', '0', '-force_key_frames', 'expr:gte(t,n_forced*2)');
} else if (/_nvenc$/.test(encoderName)) {
options.push('-forced-idr', '1', '-force_key_frames', 'expr:gte(t,n_forced*2)');
} else if (/_qsv$/.test(encoderName)) {
options.push('-idr_interval', '1');
} else if (/_rkmpp$/.test(encoderName)) {
options.push('-force_key_frames', 'expr:gte(t,n_forced*2)');
}
return options;
};
const shouldRetryWithSoftware = (message) => {
if (!message) return false;
return /Cannot load libcuda\.so\.1|Could not open encoder before EOF|Error while opening encoder|Operation not permitted|Invalid argument|mpp_create|rkmpp/i.test(message);
};
const probeFile = (filePath) => {
return new Promise((resolve, reject) => {
ffmpeg.ffprobe(filePath, (err, metadata) => {
if (err) reject(err);
else resolve(metadata);
});
});
};
const parseTimemarkToSeconds = (timemark) => {
if (typeof timemark !== 'string' || !timemark.trim()) {
return 0;
}
const parts = timemark.trim().split(':');
if (parts.length !== 3) {
const numericValue = parseFloat(timemark);
return Number.isFinite(numericValue) ? numericValue : 0;
}
const [hoursPart, minutesPart, secondsPart] = parts;
const hours = parseInt(hoursPart, 10);
const minutes = parseInt(minutesPart, 10);
const seconds = parseFloat(secondsPart);
if (![hours, minutes, seconds].every(Number.isFinite)) {
return 0;
}
return (hours * 3600) + (minutes * 60) + seconds;
};
const stopActiveTranscode = (progressKey) => {
const activeProcess = transcodeProcesses.get(progressKey);
if (!activeProcess?.command) {
return false;
}
try {
if (typeof activeProcess.command.removeAllListeners === 'function') {
activeProcess.command.removeAllListeners('progress');
activeProcess.command.removeAllListeners('stderr');
activeProcess.command.removeAllListeners('end');
activeProcess.command.removeAllListeners('error');
}
if (typeof activeProcess.command.kill === 'function') {
activeProcess.command.kill('SIGKILL');
}
} catch (killError) {
console.warn(`Failed to kill transcode process for ${progressKey}:`, killError);
} finally {
transcodeProcesses.delete(progressKey);
}
return true;
};
const extractS3Credentials = async (req) => {
const query = req.query || {};
const sessionId = req.headers['x-session-id'] || req.body?.sessionId || query.sessionId || '';
if (sessionId) {
try {
const cachedCreds = await redisClient.get(`session:${sessionId}`);
if (cachedCreds) {
const creds = JSON.parse(cachedCreds);
return { username: creds.username, password: creds.password };
}
} catch (e) {
console.error('Session retrieval error:', e);
}
}
const username = req.headers['x-s3-username'] || req.body?.username || query.username || query.accessKeyId || '';
const password = req.headers['x-s3-password'] || req.body?.password || query.password || query.secretAccessKey || '';
return {
username: typeof username === 'string' ? username.trim() : '',
password: typeof password === 'string' ? password : ''
};
};
const wss = new WebSocket.Server({ server });
wss.on('connection', (ws) => {
ws.on('message', (raw) => {
try {
const message = JSON.parse(raw.toString());
if (message.type === 'subscribe' && typeof message.key === 'string') {
const progressKey = getProgressKey(message.key);
if (ws.currentKey && ws.currentKey !== progressKey) {
removeWsClient(ws);
}
addWsClient(progressKey, ws);
const currentProgress = progressMap[progressKey];
if (currentProgress) {
if (typeof currentProgress.duration === 'number' && currentProgress.duration > 0) {
ws.send(JSON.stringify({ type: 'duration', key: message.key, duration: currentProgress.duration }));
}
ws.send(JSON.stringify({ type: 'progress', key: message.key, progress: currentProgress }));
if (currentProgress.status === 'finished' && currentProgress.mp4Url) {
ws.send(JSON.stringify({ type: 'ready', key: message.key, mp4Url: currentProgress.mp4Url }));
}
}
}
} catch (error) {
console.error('WebSocket parse error:', error);
}
});
ws.on('close', () => removeWsClient(ws));
});
const ensureS3Downloaded = async (s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId) => {
if (activeDownloads.has(progressKey)) {
try {
await activeDownloads.get(progressKey);
} catch (err) {
// Ignore error and retry if previous failed
}
}
let shouldDownload = true;
let s3Metadata = null;
if (fs.existsSync(tmpInputPath)) {
const stats = fs.statSync(tmpInputPath);
const localSize = stats.size;
try {
const headCommand = new HeadObjectCommand({ Bucket: bucket, Key: key });
s3Metadata = await s3Client.send(headCommand);
if (s3Metadata.ContentLength === localSize) {
shouldDownload = false;
console.log(`[Cache] Verified ${key}: Local size matches S3 (${localSize} bytes).`);
} else {
console.log(`[Cache] Mismatch for ${key}: Local ${localSize} vs S3 ${s3Metadata.ContentLength}. Re-downloading.`);
}
} catch (err) {
console.error(`[Cache] Failed to verify S3 metadata for ${key}:`, err.message);
}
}
if (!shouldDownload) {
progressMap[progressKey] = {
status: 'downloaded',
percent: 100,
downloadedBytes: s3Metadata.ContentLength,
totalBytes: s3Metadata.ContentLength,
streamSessionId,
details: 'Source cached locally and verified against S3...',
mp4Url: null
};
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
return;
}
const downloadPromise = (async () => {
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
const response = await s3Client.send(command);
const totalBytes = response.ContentLength || 0;
const s3Stream = response.Body;
let downloadedBytes = 0;
const downloadingPath = tmpInputPath + '.downloading';
progressMap[progressKey] = {
status: 'downloading',
percent: 0,
downloadedBytes: 0,
totalBytes,
streamSessionId,
details: 'Downloading full source...',
mp4Url: null
};
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
await new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(downloadingPath);
s3Stream.on('data', (chunk) => {
downloadedBytes += chunk.length;
const percent = totalBytes ? Math.min(100, Math.round((downloadedBytes / totalBytes) * 100)) : 0;
const downloadState = {
status: 'downloading',
percent,
downloadedBytes,
totalBytes,
streamSessionId,
details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...',
mp4Url: null
};
progressMap[progressKey] = downloadState;
broadcastWs(progressKey, { type: 'progress', key, progress: downloadState });
});
s3Stream.on('error', reject);
writeStream.on('error', reject);
writeStream.on('finish', () => {
try {
fs.renameSync(downloadingPath, tmpInputPath);
resolve();
} catch (e) {
reject(e);
}
});
s3Stream.pipe(writeStream);
});
progressMap[progressKey] = {
status: 'downloaded',
percent: 100,
downloadedBytes,
totalBytes,
streamSessionId,
details: 'Source download complete...',
mp4Url: null
};
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
})();
activeDownloads.set(progressKey, downloadPromise);
try {
await downloadPromise;
} catch (err) {
if (fs.existsSync(tmpInputPath + '.downloading')) {
fs.rmSync(tmpInputPath + '.downloading', { force: true });
}
throw err;
} finally {
activeDownloads.delete(progressKey);
}
};
app.get('/api/buckets', async (req, res) => {
try {
const auth = await extractS3Credentials(req);
const s3Client = createS3Client(auth);
const command = new ListBucketsCommand({});
const response = await s3Client.send(command);
const buckets = response.Buckets || [];
res.json({ buckets });
} catch (error) {
console.error('Error listing buckets:', error);
res.status(500).json({ error: 'Failed to list buckets', detail: error.message });
}
});
// Endpoint to list videos in the bucket
app.get('/api/videos', async (req, res) => {
try {
const bucket = req.query.bucket || BUCKET_NAME;
if (!bucket) {
return res.status(400).json({ error: 'Bucket name is required' });
}
const allObjects = [];
const auth = await extractS3Credentials(req);
const s3Client = createS3Client(auth);
let continuationToken;
do {
const command = new ListObjectsV2Command({
Bucket: bucket,
ContinuationToken: continuationToken,
});
const response = await s3Client.send(command);
allObjects.push(...(response.Contents || []));
continuationToken = response.IsTruncated ? response.NextContinuationToken : undefined;
} while (continuationToken);
// Filter for a broader set of common video formats
const videoExtensions = [
'.3gp', '.3g2', '.asf', '.avi', '.divx', '.flv', '.m2ts', '.m2v',
'.m4v', '.mkv', '.mov', '.mp4', '.mpeg', '.mpg', '.mts', '.mxf',
'.ogm', '.ogv', '.qt', '.rm', '.rmvb', '.ts', '.vob', '.vro',
'.webm', '.wmv'
];
const videos = allObjects
.map(item => item.Key)
.filter(key => {
if (!key) return false;
const lowerKey = key.toLowerCase();
return videoExtensions.some(ext => lowerKey.endsWith(ext));
})
.map(key => {
const safeBucket = bucket.replace(/[^a-z0-9]/gi, '_');
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-z0-9]/gi, '_'));
const hlsDir = path.join(CACHE_DIR, `hls-${safeBucket}-${safeKeySegments.join('-')}`);
const tmpInputPath = path.join(CACHE_DIR, `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`);
return {
key: key,
hasTranscodeCache: fs.existsSync(hlsDir),
hasDownloadCache: fs.existsSync(tmpInputPath)
};
});
res.json({ videos });
} catch (error) {
console.error('Error fetching videos:', error);
res.status(500).json({ error: 'Failed to fetch videos from S3', detail: error.message });
}
});
app.post('/api/login', async (req, res) => {
const { username, password } = req.body;
if (!username || !password) return res.status(400).json({ error: 'Missing credentials' });
try {
const s3Client = createS3Client({ username, password });
const command = new ListBucketsCommand({});
const response = await s3Client.send(command);
const buckets = response.Buckets || [];
const sessionId = crypto.randomBytes(32).toString('hex');
await redisClient.set(`session:${sessionId}`, JSON.stringify({ username, password }), 'EX', 7 * 24 * 3600);
res.json({ success: true, sessionId, username, buckets });
} catch (error) {
console.error('Login error:', error);
res.status(401).json({ error: 'Login failed', detail: error.message });
}
});
app.post('/api/logout', async (req, res) => {
const sessionId = req.headers['x-session-id'] || req.body?.sessionId;
if (sessionId) {
try { await redisClient.del(`session:${sessionId}`); } catch (e) { }
}
res.json({ success: true });
});
app.get('/api/config', (req, res) => {
const title = process.env.APP_TITLE || 'S3 Media Transcoder';
res.json({
title,
ffmpegPath: JELLYFIN_FFMPEG_PATH,
ffprobePath: JELLYFIN_FFPROBE_PATH,
defaultVideoEncoder: 'h264_rkmpp',
defaultVideoDecoder: 'auto',
videoEncoders: AVAILABLE_VIDEO_ENCODERS,
videoDecoders: AVAILABLE_VIDEO_DECODERS
});
});
app.post('/api/clear-video-transcode-cache', async (req, res) => {
try {
const { bucket, key } = req.body;
if (!bucket || !key) {
return res.status(400).json({ error: 'Bucket and key are required' });
}
const safeBucket = bucket.replace(/[^a-z0-9]/gi, '_');
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-z0-9]/gi, '_'));
const hlsDir = path.join(CACHE_DIR, `hls-${safeBucket}-${safeKeySegments.join('-')}`);
if (fs.existsSync(hlsDir)) {
fs.rmSync(hlsDir, { recursive: true, force: true });
}
res.json({ message: 'Transcode cache cleared for video' });
} catch (error) {
console.error('Error clearing video transcode cache:', error);
res.status(500).json({ error: 'Failed to clear transcode cache', detail: error.message });
}
});
app.post('/api/clear-video-download-cache', async (req, res) => {
try {
const { bucket, key } = req.body;
if (!bucket || !key) {
return res.status(400).json({ error: 'Bucket and key are required' });
}
const safeBucket = bucket.replace(/[^a-z0-9]/gi, '_');
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-z0-9]/gi, '_'));
const tmpInputPath = path.join(CACHE_DIR, `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`);
if (fs.existsSync(tmpInputPath)) {
fs.rmSync(tmpInputPath, { force: true });
}
res.json({ message: 'Download cache cleared for video' });
} catch (error) {
console.error('Error clearing video download cache:', error);
res.status(500).json({ error: 'Failed to clear download cache', detail: error.message });
}
});
app.post('/api/stop-transcode', (req, res) => {
try {
const { key } = req.body;
if (!key) {
return res.status(400).json({ error: 'Video key is required' });
}
const progressKey = getProgressKey(key);
if (!transcodeProcesses.has(progressKey)) {
return res.status(404).json({ error: 'No active transcode found for this key' });
}
stopActiveTranscode(progressKey);
progressMap[progressKey] = {
status: 'cancelled',
percent: 0,
details: 'Transcode stopped by user',
mp4Url: null
};
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
res.json({ message: 'Transcode stopped' });
} catch (error) {
console.error('Error stopping transcode:', error);
res.status(500).json({ error: 'Failed to stop transcode', detail: error.message });
}
});
const HLS_SEGMENT_TIME = 6;
const waitForSegment = async (hlsDir, segIndex, timeoutMs = 45000) => {
const start = Date.now();
const segPath = path.join(hlsDir, `segment_${segIndex}.ts`);
const m3u8Path = path.join(hlsDir, `temp.m3u8`);
while (Date.now() - start < timeoutMs) {
if (fs.existsSync(m3u8Path)) {
const m3u8Content = fs.readFileSync(m3u8Path, 'utf8');
if (m3u8Content.includes(`segment_${segIndex}.ts`)) {
return true;
}
if (m3u8Content.includes(`#EXT-X-ENDLIST`)) {
if (fs.existsSync(segPath)) return true;
return false;
}
}
await new Promise(r => setTimeout(r, 200));
}
return false;
};
app.get('/api/hls/playlist.m3u8', async (req, res) => {
const bucket = req.query.bucket;
const key = req.query.key;
if (!bucket || !key) return res.status(400).send('Bad Request');
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_'));
const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_');
const tmpInputPath = path.join(CACHE_DIR, `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`);
const auth = await extractS3Credentials(req);
const s3Client = createS3Client(auth);
const progressKey = getProgressKey(key);
const streamSessionId = createStreamSessionId();
try {
await ensureS3Downloaded(s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId);
} catch (err) {
console.error('S3 Download Failed:', err);
return res.status(500).send('S3 Download Failed');
}
let duration = 0;
try {
const metadata = await probeFile(tmpInputPath);
duration = parseFloat(metadata.format?.duration || 0);
} catch (err) { }
if (duration <= 0) duration = 3600;
const totalSegments = Math.ceil(duration / HLS_SEGMENT_TIME);
let m3u8 = `#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:${HLS_SEGMENT_TIME}\n#EXT-X-MEDIA-SEQUENCE:0\n#EXT-X-PLAYLIST-TYPE:VOD\n`;
for (let i = 0; i < totalSegments; i++) {
let segDur = HLS_SEGMENT_TIME;
if (i === totalSegments - 1 && duration % HLS_SEGMENT_TIME !== 0) {
segDur = (duration % HLS_SEGMENT_TIME) || HLS_SEGMENT_TIME;
}
m3u8 += `#EXTINF:${segDur.toFixed(6)},\nsegment.ts?bucket=${encodeURIComponent(bucket)}&key=${encodeURIComponent(key)}&seg=${i}&encoder=${req.query.encoder || 'h264_rkmpp'}&decoder=${req.query.decoder || 'auto'}\n`;
}
m3u8 += `#EXT-X-ENDLIST\n`;
res.setHeader('Content-Type', 'application/vnd.apple.mpegurl');
res.setHeader('Cache-Control', 'no-cache');
res.send(m3u8);
});
const hlsProcesses = new Map();
// Watchdog: Kill HLS transcoding if the frontend stops requesting segments
setInterval(() => {
const now = Date.now();
for (const [key, processInfo] of hlsProcesses.entries()) {
if (processInfo.lastActive && now - processInfo.lastActive > 30000) {
try {
if (processInfo.command) {
processInfo.command.kill('SIGKILL');
}
} catch (e) {
console.warn(`Failed to kill inactive HLS process for ${key}:`, e);
}
hlsProcesses.delete(key);
console.log(`[Watchdog] Terminated inactive HLS transcode for ${key}`);
}
}
}, 10000);
app.get('/api/hls/segment.ts', async (req, res) => {
const bucket = req.query.bucket;
const key = req.query.key;
const seg = parseInt(req.query.seg || '0');
const requestedEncoder = req.query.encoder || 'h264_rkmpp';
const requestedDecoder = req.query.decoder || 'auto';
if (!bucket || !key || isNaN(seg)) return res.status(400).send('Bad Request');
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_'));
const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_');
const tmpInputPath = path.join(CACHE_DIR, `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`);
const progressKey = safeKeySegments.join('/');
const hlsDir = path.join(CACHE_DIR, `hls-${safeBucket}-${progressKey}`);
if (!fs.existsSync(hlsDir)) fs.mkdirSync(hlsDir, { recursive: true });
const targetSegPath = path.join(hlsDir, `segment_${seg}.ts`);
let currentProcess = hlsProcesses.get(progressKey);
if (currentProcess) {
currentProcess.lastActive = Date.now();
}
const checkIsCachedAndCompleted = () => {
if (!fs.existsSync(targetSegPath)) return false;
const m3u8Path = path.join(hlsDir, `temp.m3u8`);
if (fs.existsSync(m3u8Path) && fs.readFileSync(m3u8Path, 'utf8').includes(`segment_${seg}.ts`)) return true;
if (currentProcess && Math.abs((currentProcess.currentSeg || 0) - seg) > 3) return true;
// If there's no active process, any existing file is from a past complete run
if (!currentProcess) return true;
return false;
};
if (checkIsCachedAndCompleted()) {
if (currentProcess) currentProcess.currentSeg = Math.max(currentProcess.currentSeg, seg);
res.setHeader('Content-Type', 'video/MP2T');
return res.sendFile(targetSegPath);
}
const needsNewProcess = !currentProcess || (!fs.existsSync(targetSegPath) && (seg < (currentProcess.currentSeg || 0) || seg > (currentProcess.currentSeg || 0) + 4));
if (needsNewProcess) {
if (currentProcess && currentProcess.command) {
try { currentProcess.command.kill('SIGKILL'); } catch (e) { }
}
const startTime = Math.max(0, seg * HLS_SEGMENT_TIME);
let sourceMetadata = null;
try { sourceMetadata = await probeFile(tmpInputPath); } catch (e) { }
const encoderName = availableEncoderValues.has(requestedEncoder) ? requestedEncoder : 'h264_rkmpp';
const decoderName = availableDecoderValues.has(requestedDecoder) ? requestedDecoder : 'auto';
const m3u8Path = path.join(hlsDir, `temp.m3u8`);
if (fs.existsSync(m3u8Path)) fs.unlinkSync(m3u8Path);
const ffmpegCommand = ffmpeg().input(tmpInputPath);
if (startTime > 0) ffmpegCommand.seekInput(startTime);
ffmpegCommand.videoCodec(encoderName).audioCodec('aac');
if (isVaapiCodec(encoderName)) {
ffmpegCommand.inputOptions(['-vaapi_device', '/dev/dri/renderD128']).videoFilters('format=nv12,hwupload');
}
const resolvedDecoderName = decoderName === 'auto' && isRkmppCodec(encoderName) ? getRkmppDecoderName(sourceMetadata) : decoderName;
if (resolvedDecoderName && resolvedDecoderName !== 'auto') ffmpegCommand.inputOptions(['-c:v', resolvedDecoderName]);
const segmentFilename = path.join(hlsDir, `segment_%d.ts`);
const hlsOptions = createFfmpegOptions(encoderName).concat([
'-f', 'hls',
'-hls_time', HLS_SEGMENT_TIME.toString(),
'-hls_list_size', '0',
'-hls_segment_filename', segmentFilename,
'-start_number', seg.toString(),
'-copyts',
'-avoid_negative_ts', 'disabled',
'-muxdelay', '0',
'-muxpreload', '0'
]);
ffmpegCommand.outputOptions(hlsOptions).output(m3u8Path);
ffmpegCommand.on('error', (err) => {
console.error('HLS FFmpeg Error:', err.message);
});
ffmpegCommand.on('progress', (progress) => {
const timemarkSeconds = parseTimemarkToSeconds(progress.timemark || '0');
const absoluteSeconds = startTime + (isFinite(timemarkSeconds) ? timemarkSeconds : 0);
const totalDuration = parseFloat(sourceMetadata?.format?.duration || 0);
let percent = 0;
if (totalDuration > 0) {
percent = Math.min(Math.max(Math.round((absoluteSeconds / totalDuration) * 100), 0), 100);
}
const progressState = {
status: 'transcoding',
percent,
frame: progress.frames || null,
fps: progress.currentFps || null,
bitrate: progress.currentKbps || null,
timemark: progress.timemark || null,
absoluteSeconds,
duration: totalDuration || null,
startSeconds: startTime,
details: `处理进度 ${percent}%`,
mp4Url: null
};
progressMap[progressKey] = progressState;
broadcastWs(progressKey, { type: 'progress', key, progress: progressState });
console.log(`[FFmpeg] ${progressKey} | ${progress.timemark} | ${progress.currentFps}fps | ${progress.currentKbps}kbps | ${percent}%`);
});
ffmpegCommand.on('end', () => {
console.log(`[FFmpeg] ${progressKey} HLS transcode completed.`);
});
ffmpegCommand.run();
currentProcess = { command: ffmpegCommand, currentSeg: seg, lastActive: Date.now() };
hlsProcesses.set(progressKey, currentProcess);
}
const ready = await waitForSegment(hlsDir, seg);
if (!ready) {
return res.status(500).send('Segment generation timeout');
}
if (currentProcess) {
currentProcess.currentSeg = Math.max(currentProcess.currentSeg, seg);
currentProcess.lastActive = Date.now();
}
res.setHeader('Content-Type', 'video/MP2T');
res.sendFile(targetSegPath);
});
app.get('/api/stream', async (req, res) => {
const bucket = req.query.bucket;
const key = req.query.key;
const requestedDecoder = typeof req.query.decoder === 'string' ? req.query.decoder.trim() : 'auto';
const requestedEncoder = typeof req.query.encoder === 'string' ? req.query.encoder.trim() : 'h264_rkmpp';
const startSeconds = parseFloat(req.query.ss) || 0;
const streamSessionId = typeof req.query.streamSessionId === 'string' && req.query.streamSessionId.trim()
? req.query.streamSessionId.trim()
: createStreamSessionId();
if (!bucket) {
return res.status(400).json({ error: 'Bucket name is required' });
}
if (!key) {
return res.status(400).json({ error: 'Video key is required' });
}
const videoEncoder = availableEncoderValues.has(requestedEncoder) ? requestedEncoder : 'h264_rkmpp';
const requestedVideoDecoder = availableDecoderValues.has(requestedDecoder) ? requestedDecoder : 'auto';
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_'));
const progressKey = safeKeySegments.join('/');
const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_');
const tmpInputPath = path.join(CACHE_DIR, `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`);
const cacheExists = fs.existsSync(tmpInputPath);
const auth = await extractS3Credentials(req);
const s3Client = createS3Client(auth);
try {
const replacedExistingStream = stopActiveTranscode(progressKey);
if (replacedExistingStream && startSeconds > 0) {
progressMap[progressKey] = {
...(progressMap[progressKey] || {}),
status: 'transcoding',
percent: Math.min(Math.max(Math.round((startSeconds / Math.max(progressMap[progressKey]?.duration || startSeconds || 1, 1)) * 100), 0), 100),
startSeconds,
streamSessionId,
details: `Restarting transcode from ${startSeconds.toFixed(2)}s`,
mp4Url: null
};
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
}
await ensureS3Downloaded(s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId);
// Probe file for duration and broadcast to clients
let sourceMetadata = null;
try {
const metadata = await probeFile(tmpInputPath);
sourceMetadata = metadata;
const duration = metadata.format?.duration || 0;
progressMap[progressKey] = {
...(progressMap[progressKey] || {}),
duration: parseFloat(duration) || 0,
streamSessionId
};
broadcastWs(progressKey, { type: 'duration', key, duration: parseFloat(duration) });
} catch (probeErr) {
console.error('Probe failed:', probeErr);
}
res.setHeader('Content-Type', 'video/mp4');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
let ffmpegCommand = null;
const startStream = (encoderName, decoderName) => {
const streamingOptions = createFfmpegOptions(encoderName).concat(getSeekFriendlyOutputOptions(encoderName, sourceMetadata));
ffmpegCommand = ffmpeg()
.input(tmpInputPath)
.videoCodec(encoderName)
.audioCodec('aac')
.outputOptions(streamingOptions)
.format('mp4');
if (startSeconds > 0) {
if (typeof ffmpegCommand.seekInput === 'function') {
ffmpegCommand.seekInput(startSeconds);
} else {
ffmpegCommand.inputOptions(['-ss', startSeconds.toString()]);
}
}
if (isVaapiCodec(encoderName)) {
ffmpegCommand
.inputOptions(['-vaapi_device', '/dev/dri/renderD128'])
.videoFilters('format=nv12,hwupload');
}
const resolvedDecoderName = decoderName === 'auto' && isRkmppCodec(encoderName)
? getRkmppDecoderName(sourceMetadata)
: decoderName;
if (resolvedDecoderName && resolvedDecoderName !== 'auto') {
ffmpegCommand.inputOptions(['-c:v', resolvedDecoderName]);
}
transcodeProcesses.set(progressKey, { command: ffmpegCommand, streamSessionId });
ffmpegCommand
.on('progress', (progress) => {
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
return;
}
const effectiveDuration = Math.max(0, (progressMap[progressKey]?.duration || 0) - startSeconds);
const timemarkSeconds = parseTimemarkToSeconds(progress.timemark || '0');
const absoluteSeconds = startSeconds + (isFinite(timemarkSeconds) ? timemarkSeconds : 0);
const percent = effectiveDuration > 0
? Math.min(Math.max(Math.round((absoluteSeconds / (progressMap[progressKey]?.duration || effectiveDuration)) * 100), 0), 100)
: Math.min(Math.max(Math.round(progress.percent || 0), 0), 100);
const progressState = {
status: 'transcoding',
percent,
frame: progress.frames || null,
fps: progress.currentFps || null,
bitrate: progress.currentKbps || null,
timemark: progress.timemark || null,
absoluteSeconds,
duration: progressMap[progressKey]?.duration || null,
startSeconds,
streamSessionId,
details: `Streaming transcode ${percent}%`,
mp4Url: null
};
progressMap[progressKey] = progressState;
broadcastWs(progressKey, { type: 'progress', key, progress: progressState });
})
.on('stderr', (stderrLine) => {
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
return;
}
console.log(`ffmpeg stderr: ${stderrLine}`);
})
.on('end', () => {
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
return;
}
transcodeProcesses.delete(progressKey);
progressMap[progressKey] = {
status: 'finished',
percent: 100,
duration: progressMap[progressKey]?.duration || null,
startSeconds,
streamSessionId,
details: 'Streaming transcode complete',
mp4Url: null
};
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
})
.on('error', (err) => {
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
return;
}
transcodeProcesses.delete(progressKey);
const failedState = {
status: 'failed',
percent: progressMap[progressKey]?.percent || 0,
duration: progressMap[progressKey]?.duration || null,
startSeconds,
streamSessionId,
details: err.message || 'Streaming transcode failed',
mp4Url: null
};
progressMap[progressKey] = failedState;
broadcastWs(progressKey, { type: 'progress', key, progress: failedState });
if (!res.headersSent) {
res.status(500).json({ error: 'Failed to stream transcoded video', detail: err.message });
} else {
res.destroy(err);
}
});
ffmpegCommand.pipe(res, { end: true });
};
res.on('close', () => {
if (ffmpegCommand && typeof ffmpegCommand.kill === 'function') {
try {
ffmpegCommand.kill('SIGKILL');
} catch (_) { }
}
if (transcodeProcesses.get(progressKey)?.streamSessionId === streamSessionId) {
transcodeProcesses.delete(progressKey);
}
});
startStream(videoEncoder, requestedVideoDecoder);
} catch (error) {
console.error('Error in stream:', error);
if (!res.headersSent) {
res.status(500).json({ error: 'Failed to stream video', detail: error.message });
} else {
res.destroy(error);
}
}
});
server.listen(PORT, HOST, () => {
console.log(`Server running on http://${HOST}:${PORT}`);
});