diff --git a/server.js b/server.js index a8ad4f2..84838ac 100644 --- a/server.js +++ b/server.js @@ -933,11 +933,12 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe console.log(`[HLS] FFmpeg config: encoder=${encoderName}, decoder=${decoderName}, startTime=${startTime}s, subtitleIndex=${subtitleIndex}, persistent=${isPersistent}`); const m3u8Name = isPersistent ? 'temp.m3u8' : `temp_${progressKey.replace(/[^a-zA-Z0-9]/g, '_')}.m3u8`; - const m3u8Path = path.join(hlsDir, m3u8Name); - if (fs.existsSync(m3u8Path)) fs.unlinkSync(m3u8Path); + const m3u8Path = path.join(hlsDir, m3u8Name).replace(/\\/g, '/'); + if (fs.existsSync(m3u8Path)) try { fs.unlinkSync(m3u8Path); } catch (e) {} + const normalizedInputPath = tmpInputPath.replace(/\\/g, '/'); + const ffmpegCommand = ffmpeg().input(normalizedInputPath); - const ffmpegCommand = ffmpeg().input(tmpInputPath); if (startTime > 0) ffmpegCommand.seekInput(startTime); ffmpegCommand.videoCodec(encoderName).audioCodec('aac'); @@ -1008,9 +1009,16 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe progressMap[broadcastRoomId] = failedState; broadcastWs(broadcastRoomId, { type: 'progress', key: broadcastKey, progress: failedState }); hlsProcesses.delete(progressKey); + // Clean up temporary m3u8 file + const m3u8Name = isPersistent ? 'temp.m3u8' : `temp_${progressKey.replace(/[^a-zA-Z0-9]/g, '_')}.m3u8`; + const m3u8Path = path.join(hlsDir, m3u8Name); + if (!isPersistent && fs.existsSync(m3u8Path)) { + try { fs.unlinkSync(m3u8Path); } catch (e) {} + } }); + ffmpegCommand.on('progress', (progress) => { const timemarkSeconds = parseTimemarkToSeconds(progress.timemark || '0'); const absoluteSeconds = startTime + (isFinite(timemarkSeconds) ? timemarkSeconds : 0); @@ -1066,8 +1074,13 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe progress: finishedState }); hlsProcesses.delete(progressKey); + // Clean up temporary m3u8 file + if (!isPersistent && fs.existsSync(m3u8Path)) { + try { fs.unlinkSync(m3u8Path); } catch (e) {} + } }); + ffmpegCommand.run(); console.log(`[HLS] FFmpeg process started for ${progressKey} (persistent=${isPersistent}, m3u8=${m3u8Name})`); const newProcessInfo = { command: ffmpegCommand, currentSeg: seg, lastActive: Date.now(), persistent: isPersistent, m3u8Name }; @@ -1128,43 +1141,68 @@ app.get('/api/hls/segment.ts', async (req, res) => { const baseProgressKey = getProgressKey(key); const subtitleSuffix = (subtitleIndex !== null && subtitleIndex !== undefined && subtitleIndex !== '-1') ? `-sub${subtitleIndex}` : ''; - const progressKey = `${baseProgressKey}${subtitleSuffix}`; + const progressKeyPrefix = `${baseProgressKey}${subtitleSuffix}`; const hlsDir = getHlsCacheDir(bucket, key, subtitleIndex); if (!fs.existsSync(hlsDir)) fs.mkdirSync(hlsDir, { recursive: true }); const targetSegPath = path.join(hlsDir, `segment_${seg}.ts`); - let currentProcess = hlsProcesses.get(progressKey); + + // Find a process that is "closest" to our segment to avoid redundant side-processes + let currentProcess = null; + let closestKey = null; + for (const [procKey, procInfo] of hlsProcesses.entries()) { + if (procKey === progressKeyPrefix || procKey.startsWith(`${progressKeyPrefix}:`)) { + // Re-use if it's within a window of 2 to 10 segments ahead + if (seg >= (procInfo.currentSeg || 0) && seg <= (procInfo.currentSeg || 0) + 12) { + currentProcess = procInfo; + closestKey = procKey; + break; + } + } + } if (currentProcess) { currentProcess.lastActive = Date.now(); } + const checkIsCachedAndCompleted = () => { if (!fs.existsSync(targetSegPath)) return false; - const m3u8Path = path.join(hlsDir, `temp.m3u8`); - if (fs.existsSync(m3u8Path) && fs.readFileSync(m3u8Path, 'utf8').includes(`segment_${seg}.ts`)) return true; + + // Check all m3u8 files in the directory to see if any process has finished this segment + const files = fs.readdirSync(hlsDir); + for (const file of files) { + if (file.endsWith('.m3u8')) { + const m3u8Content = fs.readFileSync(path.join(hlsDir, file), 'utf8'); + if (m3u8Content.includes(`segment_${seg}.ts`)) return true; + } + } + if (currentProcess && Math.abs((currentProcess.currentSeg || 0) - seg) > 3) return true; - // If there's no active process, any existing file is from a past complete run + // If there's no active process, any existing file is from a past complete run or already verified if (!currentProcess) return true; return false; }; + if (checkIsCachedAndCompleted()) { if (currentProcess) currentProcess.currentSeg = Math.max(currentProcess.currentSeg, seg); res.setHeader('Content-Type', 'video/MP2T'); return res.sendFile(targetSegPath); } - const needsNewProcess = !currentProcess || (!fs.existsSync(targetSegPath) && (seg < (currentProcess.currentSeg || 0) || seg > (currentProcess.currentSeg || 0) + 4)); + // If we have no process or the closest one is too far, start a new one (only if we don't already have the segment) + const needsNewProcess = !currentProcess && !checkIsCachedAndCompleted(); if (needsNewProcess) { - console.log(`[HLS] Starting new FFmpeg process for seg=${seg}, key=${key}`); + console.log(`[HLS] Starting new FFmpeg side-process for seg=${seg}, key=${key}`); currentProcess = await startHlsTranscode(bucket, key, seg, requestedEncoder, requestedDecoder, subtitleIndex); - } else { - console.log(`[HLS] Reusing existing FFmpeg process for seg=${seg}, currentSeg=${currentProcess?.currentSeg}`); + } else if (currentProcess) { + console.log(`[HLS] Reusing existing FFmpeg process (${closestKey}) for seg=${seg}, currently at ${currentProcess.currentSeg}`); } + const ready = await waitForSegment(hlsDir, seg); if (!ready) { console.error(`[HLS] Segment generation timeout: seg=${seg}, key=${key}`);