使用VALKEY

This commit is contained in:
CN-JS-HuiBai
2026-04-03 22:30:35 +08:00
parent e30736c3e7
commit 3c3bd6dcb1
4 changed files with 213 additions and 75 deletions

168
server.js
View File

@@ -4,6 +4,7 @@ const dotenv = require('dotenv');
const fs = require('fs');
const path = require('path');
const http = require('http');
const crypto = require('crypto');
const { PassThrough } = require('stream');
const WebSocket = require('ws');
const ffmpeg = require('fluent-ffmpeg');
@@ -50,6 +51,10 @@ const defaultS3ClientConfig = {
endpoint: process.env.S3_ENDPOINT || parsedBucketUrl,
forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true'
};
const VALKEY_URL = process.env.VALKEY_URL || process.env.REDIS_URL || process.env.VALKEY_ADDRESS || '';
const SESSION_COOKIE_NAME = process.env.S3_SESSION_COOKIE_NAME || 'media_coding_s3_session';
const SESSION_TTL_SECONDS = Math.max(300, parseInt(process.env.S3_SESSION_TTL_SECONDS || '2592000', 10) || 2592000);
const isSecureCookie = process.env.SESSION_COOKIE_SECURE === 'true' || process.env.NODE_ENV === 'production';
const PROJECT_ROOT = __dirname;
const DOWNLOADS_DIR = path.join(PROJECT_ROOT, 'Downloads');
@@ -64,6 +69,9 @@ const ensureDirectory = (dirPath) => {
ensureDirectory(DOWNLOADS_DIR);
ensureDirectory(CONVERT_DIR);
let valkeyClient = null;
let valkeyEnabled = false;
const createS3Client = (credentials) => {
const clientConfig = { ...defaultS3ClientConfig };
if (credentials && credentials.username && credentials.password) {
@@ -100,6 +108,126 @@ const SUPPORTED_TEXT_SUBTITLE_CODECS = new Set(['subrip', 'srt', 'ass', 'ssa', '
const getProgressKey = (key) => key.split('/').map(segment => segment.replace(/[^a-zA-Z0-9_\-]/g, '_')).join('/');
const sanitizePathSegment = (value) => value.replace(/[^a-zA-Z0-9._-]/g, '_');
const getSessionStorageKey = (sessionId) => `media-coding-web:s3-session:${sessionId}`;
const parseCookieHeader = (cookieHeader) => {
if (typeof cookieHeader !== 'string' || !cookieHeader.trim()) {
return {};
}
return cookieHeader.split(';').reduce((acc, item) => {
const separatorIndex = item.indexOf('=');
if (separatorIndex === -1) {
return acc;
}
const key = item.slice(0, separatorIndex).trim();
const value = item.slice(separatorIndex + 1).trim();
if (key) {
acc[key] = decodeURIComponent(value);
}
return acc;
}, {});
};
const appendSetCookie = (res, cookieValue) => {
const currentValue = res.getHeader('Set-Cookie');
if (!currentValue) {
res.setHeader('Set-Cookie', cookieValue);
return;
}
const nextValue = Array.isArray(currentValue) ? currentValue.concat(cookieValue) : [currentValue, cookieValue];
res.setHeader('Set-Cookie', nextValue);
};
const setSessionCookie = (res, sessionId) => {
const parts = [
`${SESSION_COOKIE_NAME}=${encodeURIComponent(sessionId)}`,
'Path=/',
'HttpOnly',
'SameSite=Lax',
`Max-Age=${SESSION_TTL_SECONDS}`
];
if (isSecureCookie) {
parts.push('Secure');
}
appendSetCookie(res, parts.join('; '));
};
const clearSessionCookie = (res) => {
appendSetCookie(res, `${SESSION_COOKIE_NAME}=; Path=/; HttpOnly; SameSite=Lax; Max-Age=0${isSecureCookie ? '; Secure' : ''}`);
};
const initializeValkey = async () => {
if (!VALKEY_URL || valkeyClient) {
return;
}
try {
const redisModule = require('redis');
valkeyClient = redisModule.createClient({ url: VALKEY_URL });
valkeyClient.on('error', (error) => {
console.error('[valkey] client error:', error);
valkeyEnabled = false;
});
await valkeyClient.connect();
valkeyEnabled = true;
console.log(`[valkey] connected url=${VALKEY_URL}`);
} catch (error) {
valkeyClient = null;
valkeyEnabled = false;
console.warn('[valkey] unavailable, falling back to request-scoped auth only:', error.message);
}
};
const createValkeySession = async (res, credentials) => {
if (!valkeyEnabled || !valkeyClient || !credentials?.username || !credentials?.password) {
return null;
}
const sessionId = crypto.randomUUID();
await valkeyClient.set(getSessionStorageKey(sessionId), JSON.stringify({
username: credentials.username,
password: credentials.password
}), {
EX: SESSION_TTL_SECONDS
});
setSessionCookie(res, sessionId);
console.log(`[auth] session stored sessionId=${sessionId} ttl=${SESSION_TTL_SECONDS}s`);
return sessionId;
};
const loadValkeySession = async (req) => {
if (!valkeyEnabled || !valkeyClient) {
return null;
}
const cookies = parseCookieHeader(req.headers.cookie || '');
const sessionId = cookies[SESSION_COOKIE_NAME];
if (!sessionId) {
return null;
}
const rawValue = await valkeyClient.get(getSessionStorageKey(sessionId));
if (!rawValue) {
return null;
}
try {
const parsed = JSON.parse(rawValue);
if (!parsed || typeof parsed.username !== 'string' || typeof parsed.password !== 'string') {
return null;
}
await valkeyClient.expire(getSessionStorageKey(sessionId), SESSION_TTL_SECONDS);
return {
username: parsed.username,
password: parsed.password,
source: 'session',
sessionId
};
} catch (error) {
console.error(`[auth] invalid session payload sessionId=${sessionId}:`, error);
return null;
}
};
const createStreamSessionId = () => `${Date.now()}-${Math.random().toString(36).slice(2, 10)}`;
const getCachePathParts = (bucket, key) => {
@@ -371,14 +499,27 @@ const stopActiveTranscode = (progressKey) => {
return true;
};
const extractS3Credentials = (req) => {
const extractS3Credentials = async (req) => {
const query = req.query || {};
const username = req.headers['x-s3-username'] || req.body?.username || query.username || query.accessKeyId || '';
const password = req.headers['x-s3-password'] || req.body?.password || query.password || query.secretAccessKey || '';
return {
const normalizedCredentials = {
username: typeof username === 'string' ? username.trim() : '',
password: typeof password === 'string' ? password : ''
password: typeof password === 'string' ? password : '',
source: 'direct'
};
if (normalizedCredentials.username && normalizedCredentials.password) {
return normalizedCredentials;
}
const sessionCredentials = await loadValkeySession(req);
if (sessionCredentials?.username && sessionCredentials?.password) {
console.log(`[auth] using valkey session sessionId=${sessionCredentials.sessionId}`);
return sessionCredentials;
}
return normalizedCredentials;
};
const wss = new WebSocket.Server({ server });
@@ -449,16 +590,23 @@ const clearConvertCache = () => {
// Endpoint to list available buckets
app.get('/api/buckets', async (req, res) => {
let auth = null;
try {
const auth = extractS3Credentials(req);
auth = await extractS3Credentials(req);
console.log(`[s3] list buckets start endpoint=${defaultS3ClientConfig.endpoint || 'aws-default'} region=${defaultS3ClientConfig.region} authProvided=${Boolean(auth.username)}`);
const s3Client = createS3Client(auth);
const command = new ListBucketsCommand({});
const response = await s3Client.send(command);
const buckets = response.Buckets || [];
if (auth.source === 'direct' && auth.username && auth.password) {
await createValkeySession(res, auth);
}
console.log(`[s3] list buckets complete count=${buckets.length}`);
res.json({ buckets });
} catch (error) {
if (auth?.source === 'session') {
clearSessionCookie(res);
}
console.error('[s3] list buckets failed:', error);
res.status(500).json({ error: 'Failed to list buckets', detail: error.message });
}
@@ -472,7 +620,7 @@ app.get('/api/videos', async (req, res) => {
return res.status(400).json({ error: 'Bucket name is required' });
}
const allObjects = [];
const auth = extractS3Credentials(req);
const auth = await extractS3Credentials(req);
const s3Client = createS3Client(auth);
let continuationToken;
let pageNumber = 0;
@@ -543,7 +691,7 @@ app.get('/api/subtitles', async (req, res) => {
}
const { downloadPath } = getCachePathParts(bucket, key);
const auth = extractS3Credentials(req);
const auth = await extractS3Credentials(req);
const s3Client = createS3Client(auth);
try {
@@ -657,7 +805,7 @@ app.get('/api/stream', async (req, res) => {
const { progressKey, downloadPath, convertPath } = getCachePathParts(bucket, key);
const auth = extractS3Credentials(req);
const auth = await extractS3Credentials(req);
const s3Client = createS3Client(auth);
try {
@@ -925,6 +1073,8 @@ app.get('/api/stream', async (req, res) => {
}
});
server.listen(PORT, HOST, () => {
console.log(`Server running on http://${HOST}:${PORT}`);
initializeValkey().finally(() => {
server.listen(PORT, HOST, () => {
console.log(`Server running on http://${HOST}:${PORT}`);
});
});