From 84c5b8f8583a52cf445ee4d31ecf6e9357d710b0 Mon Sep 17 00:00:00 2001 From: CN-JS-HuiBai Date: Fri, 3 Apr 2026 22:16:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=97=A5=E5=BF=97=E8=AE=B0?= =?UTF-8?q?=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server.js | 176 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 127 insertions(+), 49 deletions(-) diff --git a/server.js b/server.js index 4859897..ba273d9 100644 --- a/server.js +++ b/server.js @@ -2,9 +2,9 @@ const express = require('express'); const cors = require('cors'); const dotenv = require('dotenv'); const fs = require('fs'); -const os = require('os'); const path = require('path'); const http = require('http'); +const { PassThrough } = require('stream'); const WebSocket = require('ws'); const ffmpeg = require('fluent-ffmpeg'); const { S3Client, ListBucketsCommand, ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3'); @@ -51,6 +51,19 @@ const defaultS3ClientConfig = { forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true' }; +const PROJECT_ROOT = __dirname; +const DOWNLOADS_DIR = path.join(PROJECT_ROOT, 'Downloads'); +const CONVERT_DIR = path.join(PROJECT_ROOT, 'convert'); + +const ensureDirectory = (dirPath) => { + if (!fs.existsSync(dirPath)) { + fs.mkdirSync(dirPath, { recursive: true }); + } +}; + +ensureDirectory(DOWNLOADS_DIR); +ensureDirectory(CONVERT_DIR); + const createS3Client = (credentials) => { const clientConfig = { ...defaultS3ClientConfig }; if (credentials && credentials.username && credentials.password) { @@ -86,17 +99,23 @@ const AVAILABLE_VIDEO_DECODERS = [ const SUPPORTED_TEXT_SUBTITLE_CODECS = new Set(['subrip', 'srt', 'ass', 'ssa', 'mov_text', 'text', 'webvtt']); const getProgressKey = (key) => key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')).join('/'); +const sanitizePathSegment = (value) => value.replace(/[^a-zA-Z0-9._-]/g, '_'); const createStreamSessionId = () => `${Date.now()}-${Math.random().toString(36).slice(2, 10)}`; const getCachePathParts = (bucket, key) => { - const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')); + const safeKeySegments = key.split('/').map(segment => sanitizePathSegment(segment)); const progressKey = safeKeySegments.join('/'); - const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_'); - const tmpInputPath = path.join(os.tmpdir(), `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`); + const safeBucket = sanitizePathSegment(bucket); + const downloadFileName = `${safeBucket}--${safeKeySegments.join('--')}`; + const downloadPath = path.join(DOWNLOADS_DIR, downloadFileName); + const convertBaseName = `${safeBucket}--${safeKeySegments.join('--')}`; + const convertPath = path.join(CONVERT_DIR, `${convertBaseName}.tmp`); return { progressKey, safeKeySegments, - tmpInputPath + safeBucket, + downloadPath, + convertPath }; }; @@ -217,9 +236,10 @@ const shouldRetryWithSoftware = (message) => { return /Cannot load libcuda\.so\.1|Could not open encoder before EOF|Error while opening encoder|Operation not permitted|Invalid argument|mpp_create|rkmpp/i.test(message); }; -const ensureSourceCached = async ({ s3Client, bucket, key, tmpInputPath, onProgress }) => { - if (fs.existsSync(tmpInputPath)) { - const stats = fs.statSync(tmpInputPath); +const ensureSourceCached = async ({ s3Client, bucket, key, targetPath, onProgress, logger = console }) => { + if (fs.existsSync(targetPath)) { + const stats = fs.statSync(targetPath); + logger.log(`[download] cache hit bucket=${bucket} key=${key} path=${targetPath} bytes=${stats.size}`); return { totalBytes: stats.size, downloadedBytes: stats.size, @@ -227,35 +247,45 @@ const ensureSourceCached = async ({ s3Client, bucket, key, tmpInputPath, onProgr }; } + logger.log(`[download] request start bucket=${bucket} key=${key} target=${targetPath}`); const command = new GetObjectCommand({ Bucket: bucket, Key: key }); - const response = await s3Client.send(command); - const s3Stream = response.Body; - const totalBytes = response.ContentLength || 0; - let downloadedBytes = 0; + try { + const response = await s3Client.send(command); + const s3Stream = response.Body; + const totalBytes = response.ContentLength || 0; + let downloadedBytes = 0; - if (typeof onProgress === 'function') { - onProgress({ totalBytes, downloadedBytes, cacheExists: false }); - } + logger.log(`[download] response received bucket=${bucket} key=${key} totalBytes=${totalBytes || 0}`); - await new Promise((resolve, reject) => { - const writeStream = fs.createWriteStream(tmpInputPath); - s3Stream.on('data', (chunk) => { - downloadedBytes += chunk.length; - if (typeof onProgress === 'function') { - onProgress({ totalBytes, downloadedBytes, cacheExists: false }); - } + if (typeof onProgress === 'function') { + onProgress({ totalBytes, downloadedBytes, cacheExists: false }); + } + + await new Promise((resolve, reject) => { + const writeStream = fs.createWriteStream(targetPath); + s3Stream.on('data', (chunk) => { + downloadedBytes += chunk.length; + if (typeof onProgress === 'function') { + onProgress({ totalBytes, downloadedBytes, cacheExists: false }); + } + }); + s3Stream.on('error', reject); + writeStream.on('error', reject); + writeStream.on('finish', resolve); + s3Stream.pipe(writeStream); }); - s3Stream.on('error', reject); - writeStream.on('error', reject); - writeStream.on('finish', resolve); - s3Stream.pipe(writeStream); - }); - return { - totalBytes, - downloadedBytes, - cacheExists: false - }; + logger.log(`[download] request complete bucket=${bucket} key=${key} path=${targetPath} downloadedBytes=${downloadedBytes} totalBytes=${totalBytes || 0}`); + + return { + totalBytes, + downloadedBytes, + cacheExists: false + }; + } catch (error) { + logger.error(`[download] request failed bucket=${bucket} key=${key} target=${targetPath}:`, error); + throw error; + } }; const getSubtitleTracks = (metadata) => { @@ -385,13 +415,12 @@ wss.on('connection', (ws) => { const clearDownloadCache = () => { - const tmpDir = os.tmpdir(); try { - if (!fs.existsSync(tmpDir)) return; - const files = fs.readdirSync(tmpDir); + ensureDirectory(DOWNLOADS_DIR); + const files = fs.readdirSync(DOWNLOADS_DIR); for (const file of files) { - if (file.startsWith('s3-input-') && file.endsWith('.tmp')) { - const filePath = path.join(tmpDir, file); + const filePath = path.join(DOWNLOADS_DIR, file); + if (fs.statSync(filePath).isFile()) { fs.rmSync(filePath, { force: true }); } } @@ -401,6 +430,22 @@ const clearDownloadCache = () => { } }; +const clearConvertCache = () => { + try { + ensureDirectory(CONVERT_DIR); + const files = fs.readdirSync(CONVERT_DIR); + for (const file of files) { + const filePath = path.join(CONVERT_DIR, file); + if (fs.statSync(filePath).isFile()) { + fs.rmSync(filePath, { force: true }); + } + } + } catch (err) { + console.error('Failed to clear convert cache:', err); + throw err; + } +}; + // Endpoint to list available buckets app.get('/api/buckets', async (req, res) => { @@ -487,14 +532,14 @@ app.get('/api/subtitles', async (req, res) => { return res.status(400).json({ error: 'Video key is required' }); } - const { tmpInputPath } = getCachePathParts(bucket, key); + const { downloadPath } = getCachePathParts(bucket, key); const auth = extractS3Credentials(req); const s3Client = createS3Client(auth); try { - await ensureSourceCached({ s3Client, bucket, key, tmpInputPath }); + await ensureSourceCached({ s3Client, bucket, key, targetPath: downloadPath, logger: console }); - const metadata = await probeFile(tmpInputPath); + const metadata = await probeFile(downloadPath); const tracks = getSubtitleTracks(metadata); if (!requestedStreamIndex) { @@ -514,7 +559,7 @@ app.get('/api/subtitles', async (req, res) => { res.setHeader('Cache-Control', 'no-cache'); const subtitleCommand = ffmpeg() - .input(tmpInputPath) + .input(downloadPath) .outputOptions(['-map', `0:${numericStreamIndex}`, '-reset_timestamps', '1']) .format('webvtt') .noAudio() @@ -546,7 +591,8 @@ app.get('/api/subtitles', async (req, res) => { app.post('/api/clear-download-cache', (req, res) => { try { clearDownloadCache(); - res.json({ message: 'Download cache cleared' }); + clearConvertCache(); + res.json({ message: 'Download and convert cache cleared' }); } catch (error) { console.error('Error clearing download cache:', error); res.status(500).json({ error: 'Failed to clear download cache', detail: error.message }); @@ -599,7 +645,7 @@ app.get('/api/stream', async (req, res) => { const videoEncoder = availableEncoderValues.has(requestedEncoder) ? requestedEncoder : 'h264_rkmpp'; const requestedVideoDecoder = availableDecoderValues.has(requestedDecoder) ? requestedDecoder : 'auto'; - const { progressKey, tmpInputPath } = getCachePathParts(bucket, key); + const { progressKey, downloadPath, convertPath } = getCachePathParts(bucket, key); const auth = extractS3Credentials(req); const s3Client = createS3Client(auth); @@ -621,7 +667,7 @@ app.get('/api/stream', async (req, res) => { let totalBytes = 0; let downloadedBytes = 0; - const cacheExists = fs.existsSync(tmpInputPath); + const cacheExists = fs.existsSync(downloadPath); if (!cacheExists) { progressMap[progressKey] = { @@ -639,7 +685,8 @@ app.get('/api/stream', async (req, res) => { s3Client, bucket, key, - tmpInputPath, + targetPath: downloadPath, + logger: console, onProgress: ({ totalBytes: nextTotalBytes, downloadedBytes: nextDownloadedBytes }) => { totalBytes = nextTotalBytes; downloadedBytes = nextDownloadedBytes; @@ -669,9 +716,10 @@ app.get('/api/stream', async (req, res) => { }; broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] }); } else { - const stats = fs.statSync(tmpInputPath); + const stats = fs.statSync(downloadPath); totalBytes = stats.size; downloadedBytes = totalBytes; + console.log(`[download] cache ready bucket=${bucket} key=${key} path=${downloadPath} bytes=${totalBytes}`); progressMap[progressKey] = { status: 'downloaded', percent: 100, @@ -687,7 +735,7 @@ app.get('/api/stream', async (req, res) => { // Probe file for duration and broadcast to clients let sourceMetadata = null; try { - const metadata = await probeFile(tmpInputPath); + const metadata = await probeFile(downloadPath); sourceMetadata = metadata; const duration = metadata.format?.duration || 0; progressMap[progressKey] = { @@ -705,11 +753,15 @@ app.get('/api/stream', async (req, res) => { res.setHeader('Connection', 'keep-alive'); let ffmpegCommand = null; + let outputStream = null; + let cacheWriteStream = null; const startStream = (encoderName, decoderName) => { + ensureDirectory(CONVERT_DIR); const streamingOptions = createFfmpegOptions(encoderName).concat(getSeekFriendlyOutputOptions(encoderName, sourceMetadata)); + console.log(`[transcode] start bucket=${bucket} key=${key} encoder=${encoderName} decoder=${decoderName} source=${downloadPath} cache=${convertPath} startSeconds=${startSeconds}`); ffmpegCommand = ffmpeg() - .input(tmpInputPath) + .input(downloadPath) .videoCodec(encoderName) .audioCodec('aac') .outputOptions(streamingOptions) @@ -738,6 +790,24 @@ app.get('/api/stream', async (req, res) => { } transcodeProcesses.set(progressKey, { command: ffmpegCommand, streamSessionId }); + outputStream = new PassThrough(); + cacheWriteStream = fs.createWriteStream(convertPath); + + outputStream.pipe(res); + outputStream.pipe(cacheWriteStream); + + cacheWriteStream.on('finish', () => { + try { + const stats = fs.existsSync(convertPath) ? fs.statSync(convertPath) : null; + console.log(`[transcode] cache written bucket=${bucket} key=${key} path=${convertPath} bytes=${stats?.size || 0}`); + } catch (error) { + console.warn(`[transcode] cache stat failed path=${convertPath}:`, error); + } + }); + + cacheWriteStream.on('error', (error) => { + console.error(`[transcode] cache write failed bucket=${bucket} key=${key} path=${convertPath}:`, error); + }); ffmpegCommand .on('progress', (progress) => { @@ -777,6 +847,7 @@ app.get('/api/stream', async (req, res) => { if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) { return; } + console.log(`[transcode] complete bucket=${bucket} key=${key} encoder=${encoderName} cache=${convertPath}`); transcodeProcesses.delete(progressKey); progressMap[progressKey] = { status: 'finished', @@ -793,6 +864,7 @@ app.get('/api/stream', async (req, res) => { if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) { return; } + console.error(`[transcode] failed bucket=${bucket} key=${key} encoder=${encoderName} cache=${convertPath}:`, err); transcodeProcesses.delete(progressKey); const failedState = { status: 'failed', @@ -812,7 +884,7 @@ app.get('/api/stream', async (req, res) => { } }); - ffmpegCommand.pipe(res, { end: true }); + ffmpegCommand.pipe(outputStream, { end: true }); }; res.on('close', () => { @@ -824,6 +896,12 @@ app.get('/api/stream', async (req, res) => { if (transcodeProcesses.get(progressKey)?.streamSessionId === streamSessionId) { transcodeProcesses.delete(progressKey); } + if (outputStream && !outputStream.destroyed) { + outputStream.end(); + } + if (cacheWriteStream && !cacheWriteStream.destroyed) { + cacheWriteStream.end(); + } }); startStream(videoEncoder, requestedVideoDecoder);