修复冲突
This commit is contained in:
85
server.js
85
server.js
@@ -353,8 +353,9 @@ const sanitizeNumber = (value) => {
|
||||
return Number.isFinite(num) ? num : null;
|
||||
};
|
||||
|
||||
const stopActiveTranscode = (progressKey) => {
|
||||
const activeProcess = transcodeProcesses.get(progressKey);
|
||||
const stopActiveTranscode = (progressKey, sessionId = null) => {
|
||||
const processKey = sessionId ? `${progressKey}:${sessionId}` : progressKey;
|
||||
const activeProcess = transcodeProcesses.get(processKey);
|
||||
if (!activeProcess?.command) {
|
||||
return false;
|
||||
}
|
||||
@@ -370,9 +371,9 @@ const stopActiveTranscode = (progressKey) => {
|
||||
activeProcess.command.kill('SIGKILL');
|
||||
}
|
||||
} catch (killError) {
|
||||
console.warn(`Failed to kill transcode process for ${progressKey}:`, killError);
|
||||
console.warn(`Failed to kill transcode process for ${processKey}:`, killError);
|
||||
} finally {
|
||||
transcodeProcesses.delete(progressKey);
|
||||
transcodeProcesses.delete(processKey);
|
||||
}
|
||||
|
||||
return true;
|
||||
@@ -442,11 +443,19 @@ const ensureS3Downloaded = async (s3Client, bucket, key, tmpInputPath, progressK
|
||||
if (activeDownloads.has(progressKey)) {
|
||||
try {
|
||||
await activeDownloads.get(progressKey);
|
||||
return; // Already downloaded by another request
|
||||
} catch (err) {
|
||||
// Ignore error and retry if previous failed
|
||||
}
|
||||
}
|
||||
|
||||
const downloadingPath = tmpInputPath + '.downloading';
|
||||
// Cleanup stale downloading file if no active download is tracked
|
||||
if (fs.existsSync(downloadingPath)) {
|
||||
fs.rmSync(downloadingPath, { force: true });
|
||||
}
|
||||
|
||||
|
||||
let shouldDownload = true;
|
||||
let s3Metadata = null;
|
||||
|
||||
@@ -752,17 +761,19 @@ const HLS_SEGMENT_TIME = 6;
|
||||
const waitForSegment = async (hlsDir, segIndex, timeoutMs = 45000) => {
|
||||
const start = Date.now();
|
||||
const segPath = path.join(hlsDir, `segment_${segIndex}.ts`);
|
||||
const m3u8Path = path.join(hlsDir, `temp.m3u8`);
|
||||
|
||||
|
||||
// Check all m3u8 files in the directory to see if any process has finished this segment
|
||||
while (Date.now() - start < timeoutMs) {
|
||||
if (fs.existsSync(m3u8Path)) {
|
||||
const m3u8Content = fs.readFileSync(m3u8Path, 'utf8');
|
||||
if (m3u8Content.includes(`segment_${segIndex}.ts`)) {
|
||||
return true;
|
||||
}
|
||||
if (m3u8Content.includes(`#EXT-X-ENDLIST`)) {
|
||||
if (fs.existsSync(segPath)) return true;
|
||||
return false;
|
||||
if (fs.existsSync(segPath)) {
|
||||
// Even if .ts exists, ensure it's logged in some m3u8 to confirm it's ready
|
||||
const files = fs.readdirSync(hlsDir);
|
||||
for (const file of files) {
|
||||
if (file.endsWith('.m3u8')) {
|
||||
const m3u8Content = fs.readFileSync(path.join(hlsDir, file), 'utf8');
|
||||
if (m3u8Content.includes(`segment_${segIndex}.ts`)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
@@ -879,10 +890,11 @@ setInterval(() => {
|
||||
}
|
||||
}, 10000);
|
||||
|
||||
const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDecoder, subtitleIndex, isPersistent = false) => {
|
||||
const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDecoder, subtitleIndex, isPersistent = false, overrideProgressKey = null) => {
|
||||
const baseProgressKey = getProgressKey(key);
|
||||
const subtitleSuffix = (subtitleIndex !== null && subtitleIndex !== undefined && subtitleIndex !== '-1') ? `-sub${subtitleIndex}` : '';
|
||||
const progressKey = `${baseProgressKey}${subtitleSuffix}`;
|
||||
const progressKey = overrideProgressKey || `${baseProgressKey}${subtitleSuffix}`;
|
||||
|
||||
const hlsDir = getHlsCacheDir(bucket, key, subtitleIndex);
|
||||
const tmpInputPath = getInputCachePath(bucket, key);
|
||||
|
||||
@@ -890,10 +902,22 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe
|
||||
|
||||
let currentProcess = hlsProcesses.get(progressKey);
|
||||
if (currentProcess && currentProcess.command) {
|
||||
// If the current process is persistent and we are starting a non-persistent one,
|
||||
// don't kill it. We'll spawn a side-process instead.
|
||||
if (currentProcess.persistent && !isPersistent) {
|
||||
console.log(`[HLS] Persistent process running for ${progressKey}, spawning side-process for seek.`);
|
||||
// Change progressKey for the side-process to avoid killing the persistent one
|
||||
// Use a session-specific or random suffix
|
||||
const sideKey = `${progressKey}:side:${Date.now()}`;
|
||||
const sideProcess = await startHlsTranscode(bucket, key, seg, requestedEncoder, requestedDecoder, subtitleIndex, false, sideKey);
|
||||
return sideProcess;
|
||||
}
|
||||
|
||||
console.log(`[HLS] Killing previous FFmpeg process for ${progressKey}`);
|
||||
try { currentProcess.command.kill('SIGKILL'); } catch (e) { }
|
||||
}
|
||||
|
||||
|
||||
const startTime = Math.max(0, seg * HLS_SEGMENT_TIME);
|
||||
|
||||
let sourceMetadata = null;
|
||||
@@ -908,9 +932,11 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe
|
||||
|
||||
console.log(`[HLS] FFmpeg config: encoder=${encoderName}, decoder=${decoderName}, startTime=${startTime}s, subtitleIndex=${subtitleIndex}, persistent=${isPersistent}`);
|
||||
|
||||
const m3u8Path = path.join(hlsDir, `temp.m3u8`);
|
||||
const m3u8Name = isPersistent ? 'temp.m3u8' : `temp_${progressKey.replace(/[^a-zA-Z0-9]/g, '_')}.m3u8`;
|
||||
const m3u8Path = path.join(hlsDir, m3u8Name);
|
||||
if (fs.existsSync(m3u8Path)) fs.unlinkSync(m3u8Path);
|
||||
|
||||
|
||||
const ffmpegCommand = ffmpeg().input(tmpInputPath);
|
||||
if (startTime > 0) ffmpegCommand.seekInput(startTime);
|
||||
|
||||
@@ -996,8 +1022,11 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe
|
||||
details: isPersistent ? `后台预切片中 ${percent}%` : `处理进度 ${percent}%`,
|
||||
mp4Url: null
|
||||
};
|
||||
const broadcastRoomId = `${baseProgressKey}${subtitleSuffix}`;
|
||||
const broadcastKey = subtitleIndex && subtitleIndex !== '-1' ? `${key}-sub${subtitleIndex}` : key;
|
||||
broadcastWs(progressKey, { type: 'progress', key: broadcastKey, progress: progressState });
|
||||
broadcastWs(broadcastRoomId, { type: 'progress', key: broadcastKey, progress: progressState });
|
||||
|
||||
|
||||
|
||||
if (!isPersistent) {
|
||||
console.log(`[FFmpeg] ${progressKey} | ${progress.timemark} | ${sanitizeNumber(progress.currentFps) ?? '-'}fps | ${sanitizeNumber(progress.currentKbps) ?? '-'}kbps | ${percent}%`);
|
||||
@@ -1006,8 +1035,9 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe
|
||||
|
||||
ffmpegCommand.on('end', () => {
|
||||
console.log(`[FFmpeg] ${progressKey} HLS transcode completed.`);
|
||||
const broadcastRoomId = `${baseProgressKey}${subtitleSuffix}`;
|
||||
const broadcastKey = subtitleIndex && subtitleIndex !== '-1' ? `${key}-sub${subtitleIndex}` : key;
|
||||
broadcastWs(progressKey, {
|
||||
broadcastWs(broadcastRoomId, {
|
||||
type: 'progress',
|
||||
key: broadcastKey,
|
||||
progress: {
|
||||
@@ -1020,8 +1050,8 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe
|
||||
});
|
||||
|
||||
ffmpegCommand.run();
|
||||
console.log(`[HLS] FFmpeg process started for ${progressKey} (persistent=${isPersistent})`);
|
||||
const newProcessInfo = { command: ffmpegCommand, currentSeg: seg, lastActive: Date.now(), persistent: isPersistent };
|
||||
console.log(`[HLS] FFmpeg process started for ${progressKey} (persistent=${isPersistent}, m3u8=${m3u8Name})`);
|
||||
const newProcessInfo = { command: ffmpegCommand, currentSeg: seg, lastActive: Date.now(), persistent: isPersistent, m3u8Name };
|
||||
hlsProcesses.set(progressKey, newProcessInfo);
|
||||
return newProcessInfo;
|
||||
};
|
||||
@@ -1160,7 +1190,8 @@ app.get('/api/stream', async (req, res) => {
|
||||
const s3Client = createS3Client(auth);
|
||||
|
||||
try {
|
||||
const replacedExistingStream = stopActiveTranscode(progressKey);
|
||||
const replacedExistingStream = stopActiveTranscode(progressKey, streamSessionId);
|
||||
|
||||
if (replacedExistingStream && startSeconds > 0) {
|
||||
progressMap[progressKey] = {
|
||||
...(progressMap[progressKey] || {}),
|
||||
@@ -1229,7 +1260,9 @@ app.get('/api/stream', async (req, res) => {
|
||||
ffmpegCommand.inputOptions(['-c:v', resolvedDecoderName]);
|
||||
}
|
||||
|
||||
transcodeProcesses.set(progressKey, { command: ffmpegCommand, streamSessionId });
|
||||
const processKey = `${progressKey}:${streamSessionId}`;
|
||||
transcodeProcesses.set(processKey, { command: ffmpegCommand, streamSessionId });
|
||||
|
||||
|
||||
ffmpegCommand
|
||||
.on('progress', (progress) => {
|
||||
@@ -1313,9 +1346,11 @@ app.get('/api/stream', async (req, res) => {
|
||||
ffmpegCommand.kill('SIGKILL');
|
||||
} catch (_) { }
|
||||
}
|
||||
if (transcodeProcesses.get(progressKey)?.streamSessionId === streamSessionId) {
|
||||
transcodeProcesses.delete(progressKey);
|
||||
const processKey = `${progressKey}:${streamSessionId}`;
|
||||
if (transcodeProcesses.get(processKey)?.streamSessionId === streamSessionId) {
|
||||
transcodeProcesses.delete(processKey);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
startStream(videoEncoder, requestedVideoDecoder);
|
||||
|
||||
Reference in New Issue
Block a user