From 23c052bc762e0a17558d2ba7e64aba0b2de3a1f2 Mon Sep 17 00:00:00 2001 From: CN-JS-HuiBai Date: Fri, 10 Apr 2026 00:51:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server.js | 85 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/server.js b/server.js index 93621bc..12bb6ba 100644 --- a/server.js +++ b/server.js @@ -353,8 +353,9 @@ const sanitizeNumber = (value) => { return Number.isFinite(num) ? num : null; }; -const stopActiveTranscode = (progressKey) => { - const activeProcess = transcodeProcesses.get(progressKey); +const stopActiveTranscode = (progressKey, sessionId = null) => { + const processKey = sessionId ? `${progressKey}:${sessionId}` : progressKey; + const activeProcess = transcodeProcesses.get(processKey); if (!activeProcess?.command) { return false; } @@ -370,9 +371,9 @@ const stopActiveTranscode = (progressKey) => { activeProcess.command.kill('SIGKILL'); } } catch (killError) { - console.warn(`Failed to kill transcode process for ${progressKey}:`, killError); + console.warn(`Failed to kill transcode process for ${processKey}:`, killError); } finally { - transcodeProcesses.delete(progressKey); + transcodeProcesses.delete(processKey); } return true; @@ -442,11 +443,19 @@ const ensureS3Downloaded = async (s3Client, bucket, key, tmpInputPath, progressK if (activeDownloads.has(progressKey)) { try { await activeDownloads.get(progressKey); + return; // Already downloaded by another request } catch (err) { // Ignore error and retry if previous failed } } + const downloadingPath = tmpInputPath + '.downloading'; + // Cleanup stale downloading file if no active download is tracked + if (fs.existsSync(downloadingPath)) { + fs.rmSync(downloadingPath, { force: true }); + } + + let shouldDownload = true; let s3Metadata = null; @@ -752,17 +761,19 @@ const HLS_SEGMENT_TIME = 6; const waitForSegment = async (hlsDir, segIndex, timeoutMs = 45000) => { const start = Date.now(); const segPath = path.join(hlsDir, `segment_${segIndex}.ts`); - const m3u8Path = path.join(hlsDir, `temp.m3u8`); - + + // Check all m3u8 files in the directory to see if any process has finished this segment while (Date.now() - start < timeoutMs) { - if (fs.existsSync(m3u8Path)) { - const m3u8Content = fs.readFileSync(m3u8Path, 'utf8'); - if (m3u8Content.includes(`segment_${segIndex}.ts`)) { - return true; - } - if (m3u8Content.includes(`#EXT-X-ENDLIST`)) { - if (fs.existsSync(segPath)) return true; - return false; + if (fs.existsSync(segPath)) { + // Even if .ts exists, ensure it's logged in some m3u8 to confirm it's ready + const files = fs.readdirSync(hlsDir); + for (const file of files) { + if (file.endsWith('.m3u8')) { + const m3u8Content = fs.readFileSync(path.join(hlsDir, file), 'utf8'); + if (m3u8Content.includes(`segment_${segIndex}.ts`)) { + return true; + } + } } } await new Promise(r => setTimeout(r, 200)); @@ -879,10 +890,11 @@ setInterval(() => { } }, 10000); -const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDecoder, subtitleIndex, isPersistent = false) => { +const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDecoder, subtitleIndex, isPersistent = false, overrideProgressKey = null) => { const baseProgressKey = getProgressKey(key); const subtitleSuffix = (subtitleIndex !== null && subtitleIndex !== undefined && subtitleIndex !== '-1') ? `-sub${subtitleIndex}` : ''; - const progressKey = `${baseProgressKey}${subtitleSuffix}`; + const progressKey = overrideProgressKey || `${baseProgressKey}${subtitleSuffix}`; + const hlsDir = getHlsCacheDir(bucket, key, subtitleIndex); const tmpInputPath = getInputCachePath(bucket, key); @@ -890,10 +902,22 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe let currentProcess = hlsProcesses.get(progressKey); if (currentProcess && currentProcess.command) { + // If the current process is persistent and we are starting a non-persistent one, + // don't kill it. We'll spawn a side-process instead. + if (currentProcess.persistent && !isPersistent) { + console.log(`[HLS] Persistent process running for ${progressKey}, spawning side-process for seek.`); + // Change progressKey for the side-process to avoid killing the persistent one + // Use a session-specific or random suffix + const sideKey = `${progressKey}:side:${Date.now()}`; + const sideProcess = await startHlsTranscode(bucket, key, seg, requestedEncoder, requestedDecoder, subtitleIndex, false, sideKey); + return sideProcess; + } + console.log(`[HLS] Killing previous FFmpeg process for ${progressKey}`); try { currentProcess.command.kill('SIGKILL'); } catch (e) { } } + const startTime = Math.max(0, seg * HLS_SEGMENT_TIME); let sourceMetadata = null; @@ -908,9 +932,11 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe console.log(`[HLS] FFmpeg config: encoder=${encoderName}, decoder=${decoderName}, startTime=${startTime}s, subtitleIndex=${subtitleIndex}, persistent=${isPersistent}`); - const m3u8Path = path.join(hlsDir, `temp.m3u8`); + const m3u8Name = isPersistent ? 'temp.m3u8' : `temp_${progressKey.replace(/[^a-zA-Z0-9]/g, '_')}.m3u8`; + const m3u8Path = path.join(hlsDir, m3u8Name); if (fs.existsSync(m3u8Path)) fs.unlinkSync(m3u8Path); + const ffmpegCommand = ffmpeg().input(tmpInputPath); if (startTime > 0) ffmpegCommand.seekInput(startTime); @@ -996,8 +1022,11 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe details: isPersistent ? `后台预切片中 ${percent}%` : `处理进度 ${percent}%`, mp4Url: null }; + const broadcastRoomId = `${baseProgressKey}${subtitleSuffix}`; const broadcastKey = subtitleIndex && subtitleIndex !== '-1' ? `${key}-sub${subtitleIndex}` : key; - broadcastWs(progressKey, { type: 'progress', key: broadcastKey, progress: progressState }); + broadcastWs(broadcastRoomId, { type: 'progress', key: broadcastKey, progress: progressState }); + + if (!isPersistent) { console.log(`[FFmpeg] ${progressKey} | ${progress.timemark} | ${sanitizeNumber(progress.currentFps) ?? '-'}fps | ${sanitizeNumber(progress.currentKbps) ?? '-'}kbps | ${percent}%`); @@ -1006,8 +1035,9 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe ffmpegCommand.on('end', () => { console.log(`[FFmpeg] ${progressKey} HLS transcode completed.`); + const broadcastRoomId = `${baseProgressKey}${subtitleSuffix}`; const broadcastKey = subtitleIndex && subtitleIndex !== '-1' ? `${key}-sub${subtitleIndex}` : key; - broadcastWs(progressKey, { + broadcastWs(broadcastRoomId, { type: 'progress', key: broadcastKey, progress: { @@ -1020,8 +1050,8 @@ const startHlsTranscode = async (bucket, key, seg, requestedEncoder, requestedDe }); ffmpegCommand.run(); - console.log(`[HLS] FFmpeg process started for ${progressKey} (persistent=${isPersistent})`); - const newProcessInfo = { command: ffmpegCommand, currentSeg: seg, lastActive: Date.now(), persistent: isPersistent }; + console.log(`[HLS] FFmpeg process started for ${progressKey} (persistent=${isPersistent}, m3u8=${m3u8Name})`); + const newProcessInfo = { command: ffmpegCommand, currentSeg: seg, lastActive: Date.now(), persistent: isPersistent, m3u8Name }; hlsProcesses.set(progressKey, newProcessInfo); return newProcessInfo; }; @@ -1160,7 +1190,8 @@ app.get('/api/stream', async (req, res) => { const s3Client = createS3Client(auth); try { - const replacedExistingStream = stopActiveTranscode(progressKey); + const replacedExistingStream = stopActiveTranscode(progressKey, streamSessionId); + if (replacedExistingStream && startSeconds > 0) { progressMap[progressKey] = { ...(progressMap[progressKey] || {}), @@ -1229,7 +1260,9 @@ app.get('/api/stream', async (req, res) => { ffmpegCommand.inputOptions(['-c:v', resolvedDecoderName]); } - transcodeProcesses.set(progressKey, { command: ffmpegCommand, streamSessionId }); + const processKey = `${progressKey}:${streamSessionId}`; + transcodeProcesses.set(processKey, { command: ffmpegCommand, streamSessionId }); + ffmpegCommand .on('progress', (progress) => { @@ -1313,9 +1346,11 @@ app.get('/api/stream', async (req, res) => { ffmpegCommand.kill('SIGKILL'); } catch (_) { } } - if (transcodeProcesses.get(progressKey)?.streamSessionId === streamSessionId) { - transcodeProcesses.delete(progressKey); + const processKey = `${progressKey}:${streamSessionId}`; + if (transcodeProcesses.get(processKey)?.streamSessionId === streamSessionId) { + transcodeProcesses.delete(processKey); } + }); startStream(videoEncoder, requestedVideoDecoder);