import http from "node:http"; import { execFile } from "node:child_process"; import { readFile, unlink, mkdir } from "node:fs/promises"; import { promisify } from "node:util"; import { Innertube } from "youtubei.js"; import { createClient } from "@supabase/supabase-js"; import { GoogleGenAI, Type } from "@google/genai"; const execFileAsync = promisify(execFile); // ── Config ──────────────────────────────────────────────────── const PORT = parseInt(process.env.PORT || "3001", 10); const SUPABASE_URL = process.env.SUPABASE_URL; const SUPABASE_SERVICE_ROLE_KEY = process.env.SUPABASE_SERVICE_ROLE_KEY; const STORAGE_BUCKET = process.env.STORAGE_BUCKET || "workout-audio"; // Public-facing URL for constructing browser-accessible storage URLs. // SUPABASE_URL is the internal Docker network URL (e.g. http://kong:8000) // which browsers cannot reach. const SUPABASE_PUBLIC_URL = process.env.SUPABASE_PUBLIC_URL || SUPABASE_URL; const GEMINI_API_KEY = process.env.GEMINI_API_KEY || ""; if (!SUPABASE_URL || !SUPABASE_SERVICE_ROLE_KEY) { console.error("Missing SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY"); process.exit(1); } const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY); // ── YouTube client (singleton, reused across requests) ──────── let ytClient = null; let ytClientPromise = null; async function getYouTubeClient() { if (ytClient) return ytClient; if (ytClientPromise) return ytClientPromise; ytClientPromise = Innertube.create({ generate_session_locally: true, retrieve_player: true, }).then((client) => { ytClient = client; ytClientPromise = null; console.log("YouTube client initialized"); return client; }); return ytClientPromise; } // Pre-warm the client on startup getYouTubeClient().catch((err) => console.error("Failed to pre-warm YouTube client:", err.message) ); // ── Helpers ─────────────────────────────────────────────────── function parsePlaylistId(url) { if (/^[A-Za-z0-9_-]+$/.test(url) && !url.includes(".")) { return url; } try { const parsed = new URL(url); const listParam = parsed.searchParams.get("list"); if (listParam) return listParam; } catch {} throw new Error(`Could not parse playlist ID from: "${url}"`); } function readBody(req) { return new Promise((resolve, reject) => { const chunks = []; req.on("data", (c) => chunks.push(c)); req.on("end", () => { try { resolve(JSON.parse(Buffer.concat(chunks).toString())); } catch (e) { reject(new Error("Invalid JSON body")); } }); req.on("error", reject); }); } function jsonResponse(res, status, data) { res.writeHead(status, { "Content-Type": "application/json" }); res.end(JSON.stringify(data)); } // ── Routes ──────────────────────────────────────────────────── async function handlePlaylist(req, res) { const { playlistUrl } = await readBody(req); if (!playlistUrl) { return jsonResponse(res, 400, { error: "playlistUrl is required" }); } const playlistId = parsePlaylistId(playlistUrl); const yt = await getYouTubeClient(); const playlist = await yt.getPlaylist(playlistId); const title = playlist.info.title ?? "Untitled Playlist"; const items = []; function extractItems(page) { if (!page.items) return; for (const item of page.items) { // Only include items that have a video ID (skip Shorts/Reels) if (!item.id) continue; items.push({ videoId: item.id, title: item.title?.toString() ?? "Untitled", author: item.author?.name ?? null, durationSeconds: item.duration?.seconds ?? 0, thumbnailUrl: item.thumbnails?.[0]?.url ?? null, }); } } // First page extractItems(playlist); // Pagination let page = playlist; while (page.has_continuation) { page = await page.getContinuation(); extractItems(page); } console.log(`Playlist "${title}": ${items.length} items`); jsonResponse(res, 200, { title, items }); } async function handleDownload(req, res) { const { videoId, jobId } = await readBody(req); if (!videoId || !jobId) { return jsonResponse(res, 400, { error: "videoId and jobId are required" }); } // Use yt-dlp for the actual download — it handles PO tokens, signature // deciphering, and anti-bot measures that youtubei.js cannot. const tmpDir = `/tmp/ytdl-${jobId}`; const outPath = `${tmpDir}/${videoId}.m4a`; await mkdir(tmpDir, { recursive: true }); try { console.log(`Downloading ${videoId} via yt-dlp...`); const { stderr } = await execFileAsync("yt-dlp", [ "-f", "ba[ext=m4a]/ba", // best audio in m4a, fallback to best audio "--no-playlist", "--no-warnings", "-x", // extract audio "--audio-format", "m4a", // ensure m4a output "--audio-quality", "0", // best quality "-o", outPath, `https://www.youtube.com/watch?v=${videoId}`, ], { timeout: 120_000 }); if (stderr) console.warn(`yt-dlp stderr for ${videoId}: ${stderr}`); // Read the downloaded file const audioData = await readFile(outPath); console.log( `Downloaded ${videoId}: ${(audioData.length / 1024 / 1024).toFixed(1)} MB` ); // Upload to Supabase Storage — flat structure, no subfolder const storagePath = `${videoId}.m4a`; const { error: uploadError } = await supabase.storage .from(STORAGE_BUCKET) .upload(storagePath, audioData, { contentType: "audio/mp4", cacheControl: "3600", upsert: true, }); if (uploadError) { throw new Error(`Storage upload failed: ${uploadError.message}`); } // Construct public URL using the external-facing base URL, not the // internal Docker URL that supabase.storage.getPublicUrl() would use. const publicUrl = `${SUPABASE_PUBLIC_URL}/storage/v1/object/public/${STORAGE_BUCKET}/${storagePath}`; jsonResponse(res, 200, { storagePath, publicUrl }); } finally { // Clean up temp file await unlink(outPath).catch(() => {}); } } async function handleHealth(_req, res) { jsonResponse(res, 200, { status: "ok" }); } // ── Genre classification ───────────────────────────────────── const VALID_GENRES = new Set([ "edm", "hip-hop", "pop", "rock", "latin", "house", "drum-and-bass", "dubstep", "r-and-b", "country", "metal", "ambient", ]); const CLASSIFY_SYSTEM_PROMPT = `You classify music tracks into exactly one genre for a fitness/workout app. Available genres (pick exactly one per track): - edm: Electronic dance music, techno, trance, electro - hip-hop: Hip-hop, rap, trap beats - pop: Pop music, mainstream hits - rock: Rock, alternative, indie rock, punk - latin: Reggaeton, salsa, bachata, latin pop - house: House music, deep house, tech house - drum-and-bass: Drum and bass, jungle, liquid DnB - dubstep: Dubstep, bass music, brostep - r-and-b: R&B, soul, neo-soul - country: Country, country pop, Americana - metal: Heavy metal, metalcore, hard rock - ambient: Ambient, chill, lo-fi, downtempo, meditation For each track, pick the single best-fit genre. If the track is clearly a workout/tabata/HIIT track, infer genre from musical style cues in the title. If truly ambiguous, default to "edm" for workout/tabata tracks.`; /** * Fetch YouTube metadata (category + keywords) for a batch of video IDs. * Runs in parallel batches of 10 to avoid rate limits. * Returns Map. */ async function fetchVideoMetadata(videoIds) { const yt = await getYouTubeClient(); const metadata = new Map(); const BATCH_SIZE = 10; for (let i = 0; i < videoIds.length; i += BATCH_SIZE) { const batch = videoIds.slice(i, i + BATCH_SIZE); const results = await Promise.allSettled( batch.map(async (id) => { const info = await yt.getBasicInfo(id); return { id, category: info.basic_info?.category ?? null, keywords: info.basic_info?.keywords ?? [], }; }) ); for (const result of results) { if (result.status === "fulfilled") { const { id, category, keywords } = result.value; metadata.set(id, { category, keywords }); } // Rejected = skip that video's metadata silently } } return metadata; } /** * Classify tracks into genres using Gemini. * Input: array of {videoId, title, author, category?, keywords?} * Batches into groups of 50 to keep schema/prompt manageable. * Returns: Record */ async function classifyWithGemini(tracks) { if (!GEMINI_API_KEY) { console.warn("GEMINI_API_KEY not set — skipping genre classification"); return {}; } const ai = new GoogleGenAI({ apiKey: GEMINI_API_KEY }); const BATCH_SIZE = 50; const allGenres = {}; for (let i = 0; i < tracks.length; i += BATCH_SIZE) { const batch = tracks.slice(i, i + BATCH_SIZE); // Build concise input for the prompt const trackDescriptions = batch.map((t) => { const parts = [`"${t.title}"`]; if (t.author) parts.push(`by ${t.author}`); if (t.category) parts.push(`[${t.category}]`); if (t.keywords?.length) parts.push(`tags: ${t.keywords.slice(0, 8).join(", ")}`); return `${t.videoId}: ${parts.join(" — ")}`; }); const userPrompt = `Classify each track into one genre. Return a JSON object mapping videoId to genre string.\n\n${trackDescriptions.join("\n")}`; try { const response = await ai.models.generateContent({ model: "gemini-3.1-flash-lite-preview", contents: userPrompt, config: { systemInstruction: CLASSIFY_SYSTEM_PROMPT, responseMimeType: "application/json", responseSchema: { type: Type.OBJECT, properties: Object.fromEntries( batch.map((t) => [ t.videoId, { type: Type.STRING, description: "Genre classification", enum: [...VALID_GENRES], }, ]) ), required: batch.map((t) => t.videoId), }, temperature: 0.1, }, }); const parsed = JSON.parse(response.text); // Validate each genre against the allowed set for (const [videoId, genre] of Object.entries(parsed)) { if (VALID_GENRES.has(genre)) { allGenres[videoId] = genre; } else { console.warn(`Invalid genre "${genre}" for ${videoId} — skipping`); } } } catch (batchErr) { console.error(`Gemini batch ${i / BATCH_SIZE + 1} failed:`, batchErr.message); // Continue with next batch — partial results are fine } } return allGenres; } async function handleClassify(req, res) { const { items } = await readBody(req); if (!Array.isArray(items) || items.length === 0) { return jsonResponse(res, 400, { error: "items array is required" }); } console.log(`Classifying ${items.length} tracks...`); try { // Step 1: Fetch YouTube metadata for enrichment const videoIds = items.map((i) => i.videoId); console.log("Fetching YouTube metadata..."); const metadata = await fetchVideoMetadata(videoIds); console.log(`Got metadata for ${metadata.size}/${items.length} videos`); // Step 2: Merge metadata with input items const enriched = items.map((item) => { const meta = metadata.get(item.videoId); return { videoId: item.videoId, title: item.title, author: item.author ?? null, category: meta?.category ?? null, keywords: meta?.keywords ?? [], }; }); // Step 3: Classify via Gemini console.log("Calling Gemini for classification..."); const genres = await classifyWithGemini(enriched); console.log(`Classified ${Object.keys(genres).length}/${items.length} tracks`); jsonResponse(res, 200, { genres }); } catch (err) { // Classification is best-effort — never block the import console.error("Classification failed:", err.message); jsonResponse(res, 200, { genres: {}, warning: err.message }); } } // ── Server ──────────────────────────────────────────────────── const server = http.createServer(async (req, res) => { try { if (req.method === "GET" && req.url === "/health") { return await handleHealth(req, res); } if (req.method === "POST" && req.url === "/playlist") { return await handlePlaylist(req, res); } if (req.method === "POST" && req.url === "/download") { return await handleDownload(req, res); } if (req.method === "POST" && req.url === "/classify") { return await handleClassify(req, res); } jsonResponse(res, 404, { error: "Not found" }); } catch (err) { console.error(`${req.method} ${req.url} error:`, err.message); jsonResponse(res, 500, { error: err.message }); } }); server.listen(PORT, () => { console.log(`youtube-worker listening on port ${PORT}`); });