完善日志记录
This commit is contained in:
176
server.js
176
server.js
@@ -2,9 +2,9 @@ const express = require('express');
|
||||
const cors = require('cors');
|
||||
const dotenv = require('dotenv');
|
||||
const fs = require('fs');
|
||||
const os = require('os');
|
||||
const path = require('path');
|
||||
const http = require('http');
|
||||
const { PassThrough } = require('stream');
|
||||
const WebSocket = require('ws');
|
||||
const ffmpeg = require('fluent-ffmpeg');
|
||||
const { S3Client, ListBucketsCommand, ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3');
|
||||
@@ -51,6 +51,19 @@ const defaultS3ClientConfig = {
|
||||
forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true'
|
||||
};
|
||||
|
||||
const PROJECT_ROOT = __dirname;
|
||||
const DOWNLOADS_DIR = path.join(PROJECT_ROOT, 'Downloads');
|
||||
const CONVERT_DIR = path.join(PROJECT_ROOT, 'convert');
|
||||
|
||||
const ensureDirectory = (dirPath) => {
|
||||
if (!fs.existsSync(dirPath)) {
|
||||
fs.mkdirSync(dirPath, { recursive: true });
|
||||
}
|
||||
};
|
||||
|
||||
ensureDirectory(DOWNLOADS_DIR);
|
||||
ensureDirectory(CONVERT_DIR);
|
||||
|
||||
const createS3Client = (credentials) => {
|
||||
const clientConfig = { ...defaultS3ClientConfig };
|
||||
if (credentials && credentials.username && credentials.password) {
|
||||
@@ -86,17 +99,23 @@ const AVAILABLE_VIDEO_DECODERS = [
|
||||
const SUPPORTED_TEXT_SUBTITLE_CODECS = new Set(['subrip', 'srt', 'ass', 'ssa', 'mov_text', 'text', 'webvtt']);
|
||||
|
||||
const getProgressKey = (key) => key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')).join('/');
|
||||
const sanitizePathSegment = (value) => value.replace(/[^a-zA-Z0-9._-]/g, '_');
|
||||
|
||||
const createStreamSessionId = () => `${Date.now()}-${Math.random().toString(36).slice(2, 10)}`;
|
||||
const getCachePathParts = (bucket, key) => {
|
||||
const safeKeySegments = key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_'));
|
||||
const safeKeySegments = key.split('/').map(segment => sanitizePathSegment(segment));
|
||||
const progressKey = safeKeySegments.join('/');
|
||||
const safeBucket = bucket.replace(/[^a-zA-Z0-9_\-]/g, '_');
|
||||
const tmpInputPath = path.join(os.tmpdir(), `s3-input-${safeBucket}-${safeKeySegments.join('-')}.tmp`);
|
||||
const safeBucket = sanitizePathSegment(bucket);
|
||||
const downloadFileName = `${safeBucket}--${safeKeySegments.join('--')}`;
|
||||
const downloadPath = path.join(DOWNLOADS_DIR, downloadFileName);
|
||||
const convertBaseName = `${safeBucket}--${safeKeySegments.join('--')}`;
|
||||
const convertPath = path.join(CONVERT_DIR, `${convertBaseName}.tmp`);
|
||||
return {
|
||||
progressKey,
|
||||
safeKeySegments,
|
||||
tmpInputPath
|
||||
safeBucket,
|
||||
downloadPath,
|
||||
convertPath
|
||||
};
|
||||
};
|
||||
|
||||
@@ -217,9 +236,10 @@ const shouldRetryWithSoftware = (message) => {
|
||||
return /Cannot load libcuda\.so\.1|Could not open encoder before EOF|Error while opening encoder|Operation not permitted|Invalid argument|mpp_create|rkmpp/i.test(message);
|
||||
};
|
||||
|
||||
const ensureSourceCached = async ({ s3Client, bucket, key, tmpInputPath, onProgress }) => {
|
||||
if (fs.existsSync(tmpInputPath)) {
|
||||
const stats = fs.statSync(tmpInputPath);
|
||||
const ensureSourceCached = async ({ s3Client, bucket, key, targetPath, onProgress, logger = console }) => {
|
||||
if (fs.existsSync(targetPath)) {
|
||||
const stats = fs.statSync(targetPath);
|
||||
logger.log(`[download] cache hit bucket=${bucket} key=${key} path=${targetPath} bytes=${stats.size}`);
|
||||
return {
|
||||
totalBytes: stats.size,
|
||||
downloadedBytes: stats.size,
|
||||
@@ -227,35 +247,45 @@ const ensureSourceCached = async ({ s3Client, bucket, key, tmpInputPath, onProgr
|
||||
};
|
||||
}
|
||||
|
||||
logger.log(`[download] request start bucket=${bucket} key=${key} target=${targetPath}`);
|
||||
const command = new GetObjectCommand({ Bucket: bucket, Key: key });
|
||||
const response = await s3Client.send(command);
|
||||
const s3Stream = response.Body;
|
||||
const totalBytes = response.ContentLength || 0;
|
||||
let downloadedBytes = 0;
|
||||
try {
|
||||
const response = await s3Client.send(command);
|
||||
const s3Stream = response.Body;
|
||||
const totalBytes = response.ContentLength || 0;
|
||||
let downloadedBytes = 0;
|
||||
|
||||
if (typeof onProgress === 'function') {
|
||||
onProgress({ totalBytes, downloadedBytes, cacheExists: false });
|
||||
}
|
||||
logger.log(`[download] response received bucket=${bucket} key=${key} totalBytes=${totalBytes || 0}`);
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
const writeStream = fs.createWriteStream(tmpInputPath);
|
||||
s3Stream.on('data', (chunk) => {
|
||||
downloadedBytes += chunk.length;
|
||||
if (typeof onProgress === 'function') {
|
||||
onProgress({ totalBytes, downloadedBytes, cacheExists: false });
|
||||
}
|
||||
if (typeof onProgress === 'function') {
|
||||
onProgress({ totalBytes, downloadedBytes, cacheExists: false });
|
||||
}
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
const writeStream = fs.createWriteStream(targetPath);
|
||||
s3Stream.on('data', (chunk) => {
|
||||
downloadedBytes += chunk.length;
|
||||
if (typeof onProgress === 'function') {
|
||||
onProgress({ totalBytes, downloadedBytes, cacheExists: false });
|
||||
}
|
||||
});
|
||||
s3Stream.on('error', reject);
|
||||
writeStream.on('error', reject);
|
||||
writeStream.on('finish', resolve);
|
||||
s3Stream.pipe(writeStream);
|
||||
});
|
||||
s3Stream.on('error', reject);
|
||||
writeStream.on('error', reject);
|
||||
writeStream.on('finish', resolve);
|
||||
s3Stream.pipe(writeStream);
|
||||
});
|
||||
|
||||
return {
|
||||
totalBytes,
|
||||
downloadedBytes,
|
||||
cacheExists: false
|
||||
};
|
||||
logger.log(`[download] request complete bucket=${bucket} key=${key} path=${targetPath} downloadedBytes=${downloadedBytes} totalBytes=${totalBytes || 0}`);
|
||||
|
||||
return {
|
||||
totalBytes,
|
||||
downloadedBytes,
|
||||
cacheExists: false
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error(`[download] request failed bucket=${bucket} key=${key} target=${targetPath}:`, error);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
const getSubtitleTracks = (metadata) => {
|
||||
@@ -385,13 +415,12 @@ wss.on('connection', (ws) => {
|
||||
|
||||
|
||||
const clearDownloadCache = () => {
|
||||
const tmpDir = os.tmpdir();
|
||||
try {
|
||||
if (!fs.existsSync(tmpDir)) return;
|
||||
const files = fs.readdirSync(tmpDir);
|
||||
ensureDirectory(DOWNLOADS_DIR);
|
||||
const files = fs.readdirSync(DOWNLOADS_DIR);
|
||||
for (const file of files) {
|
||||
if (file.startsWith('s3-input-') && file.endsWith('.tmp')) {
|
||||
const filePath = path.join(tmpDir, file);
|
||||
const filePath = path.join(DOWNLOADS_DIR, file);
|
||||
if (fs.statSync(filePath).isFile()) {
|
||||
fs.rmSync(filePath, { force: true });
|
||||
}
|
||||
}
|
||||
@@ -401,6 +430,22 @@ const clearDownloadCache = () => {
|
||||
}
|
||||
};
|
||||
|
||||
const clearConvertCache = () => {
|
||||
try {
|
||||
ensureDirectory(CONVERT_DIR);
|
||||
const files = fs.readdirSync(CONVERT_DIR);
|
||||
for (const file of files) {
|
||||
const filePath = path.join(CONVERT_DIR, file);
|
||||
if (fs.statSync(filePath).isFile()) {
|
||||
fs.rmSync(filePath, { force: true });
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to clear convert cache:', err);
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Endpoint to list available buckets
|
||||
app.get('/api/buckets', async (req, res) => {
|
||||
@@ -487,14 +532,14 @@ app.get('/api/subtitles', async (req, res) => {
|
||||
return res.status(400).json({ error: 'Video key is required' });
|
||||
}
|
||||
|
||||
const { tmpInputPath } = getCachePathParts(bucket, key);
|
||||
const { downloadPath } = getCachePathParts(bucket, key);
|
||||
const auth = extractS3Credentials(req);
|
||||
const s3Client = createS3Client(auth);
|
||||
|
||||
try {
|
||||
await ensureSourceCached({ s3Client, bucket, key, tmpInputPath });
|
||||
await ensureSourceCached({ s3Client, bucket, key, targetPath: downloadPath, logger: console });
|
||||
|
||||
const metadata = await probeFile(tmpInputPath);
|
||||
const metadata = await probeFile(downloadPath);
|
||||
const tracks = getSubtitleTracks(metadata);
|
||||
|
||||
if (!requestedStreamIndex) {
|
||||
@@ -514,7 +559,7 @@ app.get('/api/subtitles', async (req, res) => {
|
||||
res.setHeader('Cache-Control', 'no-cache');
|
||||
|
||||
const subtitleCommand = ffmpeg()
|
||||
.input(tmpInputPath)
|
||||
.input(downloadPath)
|
||||
.outputOptions(['-map', `0:${numericStreamIndex}`, '-reset_timestamps', '1'])
|
||||
.format('webvtt')
|
||||
.noAudio()
|
||||
@@ -546,7 +591,8 @@ app.get('/api/subtitles', async (req, res) => {
|
||||
app.post('/api/clear-download-cache', (req, res) => {
|
||||
try {
|
||||
clearDownloadCache();
|
||||
res.json({ message: 'Download cache cleared' });
|
||||
clearConvertCache();
|
||||
res.json({ message: 'Download and convert cache cleared' });
|
||||
} catch (error) {
|
||||
console.error('Error clearing download cache:', error);
|
||||
res.status(500).json({ error: 'Failed to clear download cache', detail: error.message });
|
||||
@@ -599,7 +645,7 @@ app.get('/api/stream', async (req, res) => {
|
||||
const videoEncoder = availableEncoderValues.has(requestedEncoder) ? requestedEncoder : 'h264_rkmpp';
|
||||
const requestedVideoDecoder = availableDecoderValues.has(requestedDecoder) ? requestedDecoder : 'auto';
|
||||
|
||||
const { progressKey, tmpInputPath } = getCachePathParts(bucket, key);
|
||||
const { progressKey, downloadPath, convertPath } = getCachePathParts(bucket, key);
|
||||
|
||||
const auth = extractS3Credentials(req);
|
||||
const s3Client = createS3Client(auth);
|
||||
@@ -621,7 +667,7 @@ app.get('/api/stream', async (req, res) => {
|
||||
|
||||
let totalBytes = 0;
|
||||
let downloadedBytes = 0;
|
||||
const cacheExists = fs.existsSync(tmpInputPath);
|
||||
const cacheExists = fs.existsSync(downloadPath);
|
||||
|
||||
if (!cacheExists) {
|
||||
progressMap[progressKey] = {
|
||||
@@ -639,7 +685,8 @@ app.get('/api/stream', async (req, res) => {
|
||||
s3Client,
|
||||
bucket,
|
||||
key,
|
||||
tmpInputPath,
|
||||
targetPath: downloadPath,
|
||||
logger: console,
|
||||
onProgress: ({ totalBytes: nextTotalBytes, downloadedBytes: nextDownloadedBytes }) => {
|
||||
totalBytes = nextTotalBytes;
|
||||
downloadedBytes = nextDownloadedBytes;
|
||||
@@ -669,9 +716,10 @@ app.get('/api/stream', async (req, res) => {
|
||||
};
|
||||
broadcastWs(progressKey, { type: 'progress', key, progress: progressMap[progressKey] });
|
||||
} else {
|
||||
const stats = fs.statSync(tmpInputPath);
|
||||
const stats = fs.statSync(downloadPath);
|
||||
totalBytes = stats.size;
|
||||
downloadedBytes = totalBytes;
|
||||
console.log(`[download] cache ready bucket=${bucket} key=${key} path=${downloadPath} bytes=${totalBytes}`);
|
||||
progressMap[progressKey] = {
|
||||
status: 'downloaded',
|
||||
percent: 100,
|
||||
@@ -687,7 +735,7 @@ app.get('/api/stream', async (req, res) => {
|
||||
// Probe file for duration and broadcast to clients
|
||||
let sourceMetadata = null;
|
||||
try {
|
||||
const metadata = await probeFile(tmpInputPath);
|
||||
const metadata = await probeFile(downloadPath);
|
||||
sourceMetadata = metadata;
|
||||
const duration = metadata.format?.duration || 0;
|
||||
progressMap[progressKey] = {
|
||||
@@ -705,11 +753,15 @@ app.get('/api/stream', async (req, res) => {
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
|
||||
let ffmpegCommand = null;
|
||||
let outputStream = null;
|
||||
let cacheWriteStream = null;
|
||||
|
||||
const startStream = (encoderName, decoderName) => {
|
||||
ensureDirectory(CONVERT_DIR);
|
||||
const streamingOptions = createFfmpegOptions(encoderName).concat(getSeekFriendlyOutputOptions(encoderName, sourceMetadata));
|
||||
console.log(`[transcode] start bucket=${bucket} key=${key} encoder=${encoderName} decoder=${decoderName} source=${downloadPath} cache=${convertPath} startSeconds=${startSeconds}`);
|
||||
ffmpegCommand = ffmpeg()
|
||||
.input(tmpInputPath)
|
||||
.input(downloadPath)
|
||||
.videoCodec(encoderName)
|
||||
.audioCodec('aac')
|
||||
.outputOptions(streamingOptions)
|
||||
@@ -738,6 +790,24 @@ app.get('/api/stream', async (req, res) => {
|
||||
}
|
||||
|
||||
transcodeProcesses.set(progressKey, { command: ffmpegCommand, streamSessionId });
|
||||
outputStream = new PassThrough();
|
||||
cacheWriteStream = fs.createWriteStream(convertPath);
|
||||
|
||||
outputStream.pipe(res);
|
||||
outputStream.pipe(cacheWriteStream);
|
||||
|
||||
cacheWriteStream.on('finish', () => {
|
||||
try {
|
||||
const stats = fs.existsSync(convertPath) ? fs.statSync(convertPath) : null;
|
||||
console.log(`[transcode] cache written bucket=${bucket} key=${key} path=${convertPath} bytes=${stats?.size || 0}`);
|
||||
} catch (error) {
|
||||
console.warn(`[transcode] cache stat failed path=${convertPath}:`, error);
|
||||
}
|
||||
});
|
||||
|
||||
cacheWriteStream.on('error', (error) => {
|
||||
console.error(`[transcode] cache write failed bucket=${bucket} key=${key} path=${convertPath}:`, error);
|
||||
});
|
||||
|
||||
ffmpegCommand
|
||||
.on('progress', (progress) => {
|
||||
@@ -777,6 +847,7 @@ app.get('/api/stream', async (req, res) => {
|
||||
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
|
||||
return;
|
||||
}
|
||||
console.log(`[transcode] complete bucket=${bucket} key=${key} encoder=${encoderName} cache=${convertPath}`);
|
||||
transcodeProcesses.delete(progressKey);
|
||||
progressMap[progressKey] = {
|
||||
status: 'finished',
|
||||
@@ -793,6 +864,7 @@ app.get('/api/stream', async (req, res) => {
|
||||
if (transcodeProcesses.get(progressKey)?.streamSessionId !== streamSessionId) {
|
||||
return;
|
||||
}
|
||||
console.error(`[transcode] failed bucket=${bucket} key=${key} encoder=${encoderName} cache=${convertPath}:`, err);
|
||||
transcodeProcesses.delete(progressKey);
|
||||
const failedState = {
|
||||
status: 'failed',
|
||||
@@ -812,7 +884,7 @@ app.get('/api/stream', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
ffmpegCommand.pipe(res, { end: true });
|
||||
ffmpegCommand.pipe(outputStream, { end: true });
|
||||
};
|
||||
|
||||
res.on('close', () => {
|
||||
@@ -824,6 +896,12 @@ app.get('/api/stream', async (req, res) => {
|
||||
if (transcodeProcesses.get(progressKey)?.streamSessionId === streamSessionId) {
|
||||
transcodeProcesses.delete(progressKey);
|
||||
}
|
||||
if (outputStream && !outputStream.destroyed) {
|
||||
outputStream.end();
|
||||
}
|
||||
if (cacheWriteStream && !cacheWriteStream.destroyed) {
|
||||
cacheWriteStream.end();
|
||||
}
|
||||
});
|
||||
|
||||
startStream(videoEncoder, requestedVideoDecoder);
|
||||
|
||||
Reference in New Issue
Block a user