From fce0e3d58180b47f92c59c7f1173cefea0df9b8b Mon Sep 17 00:00:00 2001 From: CN-JS-HuiBai Date: Fri, 10 Apr 2026 00:03:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E4=BF=AE=E5=A4=8D=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E8=8E=B7=E5=8F=96=E4=B8=8B=E8=BD=BD=E8=BF=9B=E5=BA=A6?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server.js | 62 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/server.js b/server.js index 2b1a60e..99b5247 100644 --- a/server.js +++ b/server.js @@ -205,12 +205,15 @@ const broadcastWs = (key, payload) => { // If this is a base key (not a sub-key), also broadcast to all its sub-keys if (!key.includes('-sub')) { - for (const [subKey, subClients] of wsSubscriptions.entries()) { - if (subKey.startsWith(`${key}-sub`)) { + // Find the original raw key from the payload to reconstruct sub-keys + const rawKey = payload.key; + for (const [subRoomId, subClients] of wsSubscriptions.entries()) { + if (subRoomId.startsWith(`${key}-sub`)) { + // Extract the suffix from the room ID + const suffix = subRoomId.substring(key.length); // e.g., "-sub1" for (const client of subClients) { if (client.readyState === WebSocket.OPEN) { - // We need to keep the payload key matching the room key for filtering on the client side - const subPayload = { ...payload, key: subKey }; + const subPayload = { ...payload, key: `${rawKey}${suffix}` }; client.send(JSON.stringify(subPayload)); } } @@ -410,7 +413,13 @@ wss.on('connection', (ws) => { } addWsClient(progressKey, ws); - const currentProgress = progressMap[progressKey]; + // Check for current progress, fallback to base key if this is a sub-key and no specific progress exists + let currentProgress = progressMap[progressKey]; + if (!currentProgress && progressKey.includes('-sub')) { + const baseKey = progressKey.split('-sub')[0]; + currentProgress = progressMap[baseKey]; + } + if (currentProgress) { if (typeof currentProgress.duration === 'number' && currentProgress.duration > 0) { ws.send(JSON.stringify({ type: 'duration', key: message.key, duration: currentProgress.duration })); @@ -496,21 +505,28 @@ const ensureS3Downloaded = async (s3Client, bucket, key, tmpInputPath, progressK await new Promise((resolve, reject) => { const writeStream = fs.createWriteStream(downloadingPath); + let lastBroadcastTime = 0; s3Stream.on('data', (chunk) => { downloadedBytes += chunk.length; - const percent = totalBytes ? Math.min(100, Math.round((downloadedBytes / totalBytes) * 100)) : 0; - const downloadState = { - status: 'downloading', - percent, - downloadedBytes, - totalBytes, - streamSessionId, - details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...', - mp4Url: null - }; - progressMap[progressKey] = downloadState; - broadcastWs(progressKey, { type: 'progress', key, progress: downloadState }); + const now = Date.now(); + + // Throttle broadcasts to max once every 100ms + if (now - lastBroadcastTime > 100 || downloadedBytes === totalBytes) { + const percent = totalBytes ? Math.min(100, Math.round((downloadedBytes / totalBytes) * 100)) : 0; + const downloadState = { + status: 'downloading', + percent, + downloadedBytes, + totalBytes, + streamSessionId, + details: totalBytes ? `正在从S3下载源文件 ${percent}%` : '正在从S3下载源文件...', + mp4Url: null + }; + progressMap[progressKey] = downloadState; + broadcastWs(progressKey, { type: 'progress', key, progress: downloadState }); + lastBroadcastTime = now; + } }); s3Stream.on('error', reject); @@ -875,6 +891,18 @@ app.get('/api/hls/segment.ts', async (req, res) => { console.log(`[HLS] Segment request: seg=${seg}, key=${key}, encoder=${requestedEncoder}, sub=${subtitleIndex}`); const tmpInputPath = getInputCachePath(bucket, key); + + // Ensure the video is fully downloaded before attempting to transcode segments + const auth = await extractS3Credentials(req); + const s3Client = createS3Client(auth); + const baseProgressKeyForDownload = getProgressKey(key); + const streamSessionId = createStreamSessionId(); + try { + await ensureS3Downloaded(s3Client, bucket, key, tmpInputPath, baseProgressKeyForDownload, streamSessionId); + } catch (err) { + console.error('S3 Download Failed for segment:', err); + return res.status(500).send('S3 Download Failed'); + } const baseProgressKey = getProgressKey(key); const subtitleSuffix = (subtitleIndex !== null && subtitleIndex !== undefined && subtitleIndex !== '-1') ? `-sub${subtitleIndex}` : '';