修改为流式推流
This commit is contained in:
188
server.js
188
server.js
@@ -535,6 +535,194 @@ app.post('/api/transcode', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/stream', async (req, res) => {
|
||||
const bucket = req.query.bucket;
|
||||
const key = req.query.key;
|
||||
const codec = req.query.codec;
|
||||
const encoder = req.query.encoder;
|
||||
|
||||
if (!bucket) {
|
||||
return res.status(400).json({ error: 'Bucket name is required' });
|
||||
}
|
||||
if (!key) {
|
||||
return res.status(400).json({ error: 'Video key is required' });
|
||||
}
|
||||
|
||||
const safeCodec = codec === 'h265' ? 'h265' : 'h264';
|
||||
const safeEncoder = ['nvidia', 'intel', 'vaapi', 'neon'].includes(encoder) ? encoder : 'software';
|
||||
const codecMap = {
|
||||
software: { h264: 'libx264', h265: 'libx265' },
|
||||
neon: { h264: 'libx264', h265: 'libx265' },
|
||||
nvidia: { h264: 'h264_nvenc', h265: 'hevc_nvenc' },
|
||||
intel: { h264: 'h264_qsv', h265: 'hevc_qsv' },
|
||||
vaapi: { h264: 'h264_vaapi', h265: 'hevc_vaapi' }
|
||||
};
|
||||
const videoCodec = codecMap[safeEncoder][safeCodec];
|
||||
|
||||
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_'));
|
||||
const progressKey = safeKeySegments.join('/');
|
||||
const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_');
|
||||
const tmpInputPath = path.join(os.tmpdir(), `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`);
|
||||
const cacheExists = fs.existsSync(tmpInputPath);
|
||||
|
||||
const auth = extractS3Credentials(req);
|
||||
const s3Client = createS3Client(auth);
|
||||
|
||||
try {
|
||||
let totalBytes = 0;
|
||||
let downloadedBytes = 0;
|
||||
|
||||
if (!cacheExists) {
|
||||
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
|
||||
const response = await s3Client.send(command);
|
||||
const s3Stream = response.Body;
|
||||
totalBytes = response.ContentLength || 0;
|
||||
|
||||
progressMap[progressKey] = {
|
||||
status: 'downloading',
|
||||
percent: 0,
|
||||
downloadedBytes: 0,
|
||||
totalBytes,
|
||||
details: 'Downloading full source before streaming...',
|
||||
mp4Url: null
|
||||
};
|
||||
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
const writeStream = fs.createWriteStream(tmpInputPath);
|
||||
s3Stream.on('data', (chunk) => {
|
||||
downloadedBytes += chunk.length;
|
||||
const percent = totalBytes ? Math.min(100, Math.round(downloadedBytes / totalBytes * 100)) : 0;
|
||||
const downloadState = {
|
||||
status: 'downloading',
|
||||
percent,
|
||||
downloadedBytes,
|
||||
totalBytes,
|
||||
details: totalBytes ? `Downloading source ${percent}%` : 'Downloading source...',
|
||||
mp4Url: null
|
||||
};
|
||||
progressMap[progressKey] = downloadState;
|
||||
broadcastWs(progressKey, { type: 'progress', key, progress: downloadState });
|
||||
});
|
||||
s3Stream.on('error', reject);
|
||||
writeStream.on('error', reject);
|
||||
writeStream.on('finish', resolve);
|
||||
s3Stream.pipe(writeStream);
|
||||
});
|
||||
|
||||
progressMap[progressKey] = {
|
||||
status: 'downloaded',
|
||||
percent: 100,
|
||||
downloadedBytes,
|
||||
totalBytes,
|
||||
details: 'Source download complete, starting real-time transcode...',
|
||||
mp4Url: null
|
||||
};
|
||||
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
||||
} else {
|
||||
const stats = fs.statSync(tmpInputPath);
|
||||
totalBytes = stats.size;
|
||||
downloadedBytes = totalBytes;
|
||||
progressMap[progressKey] = {
|
||||
status: 'downloaded',
|
||||
percent: 100,
|
||||
downloadedBytes,
|
||||
totalBytes,
|
||||
details: 'Source already downloaded locally, starting real-time transcode...',
|
||||
mp4Url: null
|
||||
};
|
||||
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
||||
}
|
||||
|
||||
res.setHeader('Content-Type', 'video/mp4');
|
||||
res.setHeader('Cache-Control', 'no-cache');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
|
||||
let ffmpegCommand = null;
|
||||
|
||||
const startStream = (encoderName) => {
|
||||
const streamingOptions = createFfmpegOptions(encoderName).concat(['-movflags', 'frag_keyframe+empty_moov+faststart']);
|
||||
ffmpegCommand = ffmpeg(tmpInputPath)
|
||||
.videoCodec(encoderName)
|
||||
.audioCodec('aac')
|
||||
.outputOptions(streamingOptions)
|
||||
.format('mp4');
|
||||
|
||||
if (/_vaapi$/.test(encoderName)) {
|
||||
ffmpegCommand
|
||||
.inputOptions(['-vaapi_device', '/dev/dri/renderD128'])
|
||||
.videoFilters('format=nv12,hwupload');
|
||||
}
|
||||
|
||||
transcodeProcesses.set(progressKey, ffmpegCommand);
|
||||
|
||||
ffmpegCommand
|
||||
.on('progress', (progress) => {
|
||||
const progressState = {
|
||||
status: 'transcoding',
|
||||
percent: Math.min(Math.max(Math.round(progress.percent || 0), 0), 100),
|
||||
frame: progress.frames || null,
|
||||
fps: progress.currentFps || null,
|
||||
bitrate: progress.currentKbps || null,
|
||||
timemark: progress.timemark || null,
|
||||
details: `Streaming transcode ${Math.min(Math.max(Math.round(progress.percent || 0), 0), 100)}%`,
|
||||
mp4Url: null
|
||||
};
|
||||
progressMap[progressKey] = progressState;
|
||||
broadcastWs(progressKey, { type: 'progress', key, progress: progressState });
|
||||
})
|
||||
.on('stderr', (stderrLine) => {
|
||||
console.log(`ffmpeg stderr: ${stderrLine}`);
|
||||
})
|
||||
.on('end', () => {
|
||||
transcodeProcesses.delete(progressKey);
|
||||
progressMap[progressKey] = {
|
||||
status: 'finished',
|
||||
percent: 100,
|
||||
details: 'Streaming transcode complete',
|
||||
mp4Url: null
|
||||
};
|
||||
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
||||
})
|
||||
.on('error', (err) => {
|
||||
transcodeProcesses.delete(progressKey);
|
||||
const failedState = {
|
||||
status: 'failed',
|
||||
percent: progressMap[progressKey]?.percent || 0,
|
||||
details: err.message || 'Streaming transcode failed',
|
||||
mp4Url: null
|
||||
};
|
||||
progressMap[progressKey] = failedState;
|
||||
broadcastWs(progressKey, { type: 'progress', key, progress: failedState });
|
||||
if (!res.headersSent) {
|
||||
res.status(500).json({ error: 'Failed to stream transcoded video', detail: err.message });
|
||||
} else {
|
||||
res.destroy(err);
|
||||
}
|
||||
});
|
||||
|
||||
ffmpegCommand.pipe(res, { end: true });
|
||||
};
|
||||
|
||||
res.on('close', () => {
|
||||
if (ffmpegCommand && typeof ffmpegCommand.kill === 'function') {
|
||||
try {
|
||||
ffmpegCommand.kill('SIGKILL');
|
||||
} catch (_) {}
|
||||
}
|
||||
});
|
||||
|
||||
startStream(videoCodec);
|
||||
} catch (error) {
|
||||
console.error('Error in stream:', error);
|
||||
if (!res.headersSent) {
|
||||
res.status(500).json({ error: 'Failed to stream video', detail: error.message });
|
||||
} else {
|
||||
res.destroy(error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Status check for MP4 availability
|
||||
app.get('/api/status', (req, res) => {
|
||||
const { key } = req.query;
|
||||
|
||||
Reference in New Issue
Block a user