diff --git a/server.js b/server.js index 5f3a3f4..0d2122a 100644 --- a/server.js +++ b/server.js @@ -535,6 +535,194 @@ app.post('/api/transcode', async (req, res) => { } }); +app.get('/api/stream', async (req, res) => { + const bucket = req.query.bucket; + const key = req.query.key; + const codec = req.query.codec; + const encoder = req.query.encoder; + + if (!bucket) { + return res.status(400).json({ error: 'Bucket name is required' }); + } + if (!key) { + return res.status(400).json({ error: 'Video key is required' }); + } + + const safeCodec = codec === 'h265' ? 'h265' : 'h264'; + const safeEncoder = ['nvidia', 'intel', 'vaapi', 'neon'].includes(encoder) ? encoder : 'software'; + const codecMap = { + software: { h264: 'libx264', h265: 'libx265' }, + neon: { h264: 'libx264', h265: 'libx265' }, + nvidia: { h264: 'h264_nvenc', h265: 'hevc_nvenc' }, + intel: { h264: 'h264_qsv', h265: 'hevc_qsv' }, + vaapi: { h264: 'h264_vaapi', h265: 'hevc_vaapi' } + }; + const videoCodec = codecMap[safeEncoder][safeCodec]; + + const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')); + const progressKey = safeKeySegments.join('/'); + const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_'); + const tmpInputPath = path.join(os.tmpdir(), `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`); + const cacheExists = fs.existsSync(tmpInputPath); + + const auth = extractS3Credentials(req); + const s3Client = createS3Client(auth); + + try { + let totalBytes = 0; + let downloadedBytes = 0; + + if (!cacheExists) { + const command = new GetObjectCommand({ Bucket: bucket, Key: key }); + const response = await s3Client.send(command); + const s3Stream = response.Body; + totalBytes = response.ContentLength || 0; + + progressMap[progressKey] = { + status: 'downloading', + percent: 0, + downloadedBytes: 0, + totalBytes, + details: 'Downloading full source before streaming...', + mp4Url: null + }; + broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); + + await new Promise((resolve, reject) => { + const writeStream = fs.createWriteStream(tmpInputPath); + s3Stream.on('data', (chunk) => { + downloadedBytes += chunk.length; + const percent = totalBytes ? Math.min(100, Math.round(downloadedBytes / totalBytes * 100)) : 0; + const downloadState = { + status: 'downloading', + percent, + downloadedBytes, + totalBytes, + details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...', + mp4Url: null + }; + progressMap[progressKey] = downloadState; + broadcastWs(progressKey, { type: 'progress', key, progress: downloadState }); + }); + s3Stream.on('error', reject); + writeStream.on('error', reject); + writeStream.on('finish', resolve); + s3Stream.pipe(writeStream); + }); + + progressMap[progressKey] = { + status: 'downloaded', + percent: 100, + downloadedBytes, + totalBytes, + details: 'Source download complete, starting real-time transcode...', + mp4Url: null + }; + broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); + } else { + const stats = fs.statSync(tmpInputPath); + totalBytes = stats.size; + downloadedBytes = totalBytes; + progressMap[progressKey] = { + status: 'downloaded', + percent: 100, + downloadedBytes, + totalBytes, + details: 'Source already downloaded locally, starting real-time transcode...', + mp4Url: null + }; + broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); + } + + res.setHeader('Content-Type', 'video/mp4'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + + let ffmpegCommand = null; + + const startStream = (encoderName) => { + const streamingOptions = createFfmpegOptions(encoderName).concat(['-movflags', 'frag_keyframe+empty_moov+faststart']); + ffmpegCommand = ffmpeg(tmpInputPath) + .videoCodec(encoderName) + .audioCodec('aac') + .outputOptions(streamingOptions) + .format('mp4'); + + if (/_vaapi$/.test(encoderName)) { + ffmpegCommand + .inputOptions(['-vaapi_device', '/dev/dri/renderD128']) + .videoFilters('format=nv12,hwupload'); + } + + transcodeProcesses.set(progressKey, ffmpegCommand); + + ffmpegCommand + .on('progress', (progress) => { + const progressState = { + status: 'transcoding', + percent: Math.min(Math.max(Math.round(progress.percent || 0), 0), 100), + frame: progress.frames || null, + fps: progress.currentFps || null, + bitrate: progress.currentKbps || null, + timemark: progress.timemark || null, + details: `Streaming transcode ${Math.min(Math.max(Math.round(progress.percent || 0), 0), 100)}%`, + mp4Url: null + }; + progressMap[progressKey] = progressState; + broadcastWs(progressKey, { type: 'progress', key, progress: progressState }); + }) + .on('stderr', (stderrLine) => { + console.log(`ffmpeg stderr: ${stderrLine}`); + }) + .on('end', () => { + transcodeProcesses.delete(progressKey); + progressMap[progressKey] = { + status: 'finished', + percent: 100, + details: 'Streaming transcode complete', + mp4Url: null + }; + broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); + }) + .on('error', (err) => { + transcodeProcesses.delete(progressKey); + const failedState = { + status: 'failed', + percent: progressMap[progressKey]?.percent || 0, + details: err.message || 'Streaming transcode failed', + mp4Url: null + }; + progressMap[progressKey] = failedState; + broadcastWs(progressKey, { type: 'progress', key, progress: failedState }); + if (!res.headersSent) { + res.status(500).json({ error: 'Failed to stream transcoded video', detail: err.message }); + } else { + res.destroy(err); + } + }); + + ffmpegCommand.pipe(res, { end: true }); + }; + + res.on('close', () => { + if (ffmpegCommand && typeof ffmpegCommand.kill === 'function') { + try { + ffmpegCommand.kill('SIGKILL'); + } catch (_) {} + } + }); + + startStream(videoCodec); + } catch (error) { + console.error('Error in stream:', error); + if (!res.headersSent) { + res.status(500).json({ error: 'Failed to stream video', detail: error.message }); + } else { + res.destroy(error); + } + } +}); + // Status check for MP4 availability app.get('/api/status', (req, res) => { const { key } = req.query;