diff --git a/server.js b/server.js index a4ef5d0..b55dbd2 100644 --- a/server.js +++ b/server.js @@ -142,6 +142,7 @@ const createS3Client = (credentials) => { const progressMap = {}; const transcodeProcesses = new Map(); const wsSubscriptions = new Map(); +const activeDownloads = new Map(); const AVAILABLE_VIDEO_ENCODERS = [ { value: 'h264_rkmpp', label: 'H.264(RKMPP HighSpeed)' }, @@ -386,6 +387,107 @@ wss.on('connection', (ws) => { ws.on('close', () => removeWsClient(ws)); }); +const ensureS3Downloaded = async (s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId) => { + if (activeDownloads.has(progressKey)) { + try { + await activeDownloads.get(progressKey); + } catch (err) { + // Ignore error and retry if previous failed + } + } + + if (fs.existsSync(tmpInputPath)) { + const stats = fs.statSync(tmpInputPath); + const totalBytes = stats.size; + progressMap[progressKey] = { + status: 'downloaded', + percent: 100, + downloadedBytes: totalBytes, + totalBytes, + streamSessionId, + details: 'Source already downloaded locally...', + mp4Url: null + }; + broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); + return; + } + + const downloadPromise = (async () => { + const command = new GetObjectCommand({ Bucket: bucket, Key: key }); + const response = await s3Client.send(command); + const totalBytes = response.ContentLength || 0; + const s3Stream = response.Body; + let downloadedBytes = 0; + + const downloadingPath = tmpInputPath + '.downloading'; + + progressMap[progressKey] = { + status: 'downloading', + percent: 0, + downloadedBytes: 0, + totalBytes, + streamSessionId, + details: 'Downloading full source...', + mp4Url: null + }; + broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); + + await new Promise((resolve, reject) => { + const writeStream = fs.createWriteStream(downloadingPath); + + s3Stream.on('data', (chunk) => { + downloadedBytes += chunk.length; + const percent = totalBytes ? Math.min(100, Math.round((downloadedBytes / totalBytes) * 100)) : 0; + const downloadState = { + status: 'downloading', + percent, + downloadedBytes, + totalBytes, + streamSessionId, + details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...', + mp4Url: null + }; + progressMap[progressKey] = downloadState; + broadcastWs(progressKey, { type: 'progress', key, progress: downloadState }); + }); + + s3Stream.on('error', reject); + writeStream.on('error', reject); + writeStream.on('finish', () => { + try { + fs.renameSync(downloadingPath, tmpInputPath); + resolve(); + } catch (e) { + reject(e); + } + }); + s3Stream.pipe(writeStream); + }); + + progressMap[progressKey] = { + status: 'downloaded', + percent: 100, + downloadedBytes, + totalBytes, + streamSessionId, + details: 'Source download complete...', + mp4Url: null + }; + broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); + })(); + + activeDownloads.set(progressKey, downloadPromise); + try { + await downloadPromise; + } catch (err) { + if (fs.existsSync(tmpInputPath + '.downloading')) { + fs.rmSync(tmpInputPath + '.downloading', { force: true }); + } + throw err; + } finally { + activeDownloads.delete(progressKey); + } +}; const clearDownloadCache = () => { const tmpDir = CACHE_DIR; @@ -575,84 +677,14 @@ app.get('/api/hls/playlist.m3u8', async (req, res) => { const auth = await extractS3Credentials(req); const s3Client = createS3Client(auth); - let totalBytes = 0; const progressKey = getProgressKey(key); const streamSessionId = createStreamSessionId(); - let downloadedBytes = 0; - if (!fs.existsSync(tmpInputPath)) { - try { - const command = new GetObjectCommand({ Bucket: bucket, Key: key }); - const response = await s3Client.send(command); - totalBytes = response.ContentLength; - const s3Stream = response.Body; - - progressMap[progressKey] = { - status: 'downloading', - percent: 0, - downloadedBytes: 0, - totalBytes, - streamSessionId, - details: 'Downloading full source before streaming...', - mp4Url: null - }; - broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); - - await new Promise((resolve, reject) => { - const writeStream = fs.createWriteStream(tmpInputPath); - - s3Stream.on('data', (chunk) => { - downloadedBytes += chunk.length; - const percent = totalBytes ? Math.min(100, Math.round((downloadedBytes / totalBytes) * 100)) : 0; - - const downloadState = { - status: 'downloading', - percent, - downloadedBytes, - totalBytes, - streamSessionId, - details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...', - mp4Url: null - }; - progressMap[progressKey] = downloadState; - broadcastWs(progressKey, { type: 'progress', key, progress: downloadState }); - }); - - s3Stream.on('error', reject); - writeStream.on('error', reject); - writeStream.on('finish', resolve); - s3Stream.pipe(writeStream); - }); - - progressMap[progressKey] = { - status: 'downloaded', - percent: 100, - downloadedBytes, - totalBytes, - streamSessionId, - details: 'Source download complete, parsing for HLS...', - mp4Url: null - }; - broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); - - } catch (err) { - console.error('S3 Download Failed:', err); - return res.status(500).send('S3 Download Failed'); - } - } else { - const stats = fs.statSync(tmpInputPath); - totalBytes = stats.size; - downloadedBytes = totalBytes; - progressMap[progressKey] = { - status: 'downloaded', - percent: 100, - downloadedBytes, - totalBytes, - streamSessionId, - details: 'Source already downloaded locally, parsing for HLS...', - mp4Url: null - }; - broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); + try { + await ensureS3Downloaded(s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId); + } catch (err) { + console.error('S3 Download Failed:', err); + return res.status(500).send('S3 Download Failed'); } let duration = 0; @@ -881,74 +913,7 @@ app.get('/api/stream', async (req, res) => { broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); } - let totalBytes = 0; - let downloadedBytes = 0; - - if (!cacheExists) { - const command = new GetObjectCommand({ Bucket: bucket, Key: key }); - const response = await s3Client.send(command); - const s3Stream = response.Body; - totalBytes = response.ContentLength || 0; - - progressMap[progressKey] = { - status: 'downloading', - percent: 0, - downloadedBytes: 0, - totalBytes, - streamSessionId, - details: 'Downloading full source before streaming...', - mp4Url: null - }; - broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); - - await new Promise((resolve, reject) => { - const writeStream = fs.createWriteStream(tmpInputPath); - s3Stream.on('data', (chunk) => { - downloadedBytes += chunk.length; - const percent = totalBytes ? Math.min(100, Math.round(downloadedBytes / totalBytes * 100)) : 0; - const downloadState = { - status: 'downloading', - percent, - downloadedBytes, - totalBytes, - streamSessionId, - details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...', - mp4Url: null - }; - progressMap[progressKey] = downloadState; - broadcastWs(progressKey, { type: 'progress', key, progress: downloadState }); - }); - s3Stream.on('error', reject); - writeStream.on('error', reject); - writeStream.on('finish', resolve); - s3Stream.pipe(writeStream); - }); - - progressMap[progressKey] = { - status: 'downloaded', - percent: 100, - downloadedBytes, - totalBytes, - streamSessionId, - details: 'Source download complete, starting real-time transcode...', - mp4Url: null - }; - broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); - } else { - const stats = fs.statSync(tmpInputPath); - totalBytes = stats.size; - downloadedBytes = totalBytes; - progressMap[progressKey] = { - status: 'downloaded', - percent: 100, - downloadedBytes, - totalBytes, - streamSessionId, - details: 'Source already downloaded locally, starting real-time transcode...', - mp4Url: null - }; - broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); - } + await ensureS3Downloaded(s3Client, bucket, key, tmpInputPath, progressKey, streamSessionId); // Probe file for duration and broadcast to clients let sourceMetadata = null;