"""
FlowCamoufoxEngine — Browser engine for Google Labs Flow image generation.

Uses Playwright Firefox for browser automation (migrated from Camoufox).

Reverse-engineered API (Feb 2026):
  Endpoint: POST aisandbox-pa.googleapis.com/v1/projects/{project_id}/flowMedia:batchGenerateImages
  Model:    GEM_PIX_2 (displayed as "Nano Banana Pro" in UI)
  Auth:     Bearer token (from session endpoint) + reCAPTCHA (browser-generated)
  Response: Signed GCS URLs for each generated image

Strategy:
1. Keep Firefox browser alive on a Flow project page
2. Install JS fetch() interceptor to capture reCAPTCHA + auth from real requests
3. For each generation: fill prompt → click submit → wait for API response
4. Download images from the returned signed URLs
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
import random
import time
from pathlib import Path
from typing import Any, Optional

import httpx
from dataclasses import dataclass, field

logger = logging.getLogger(__name__)

FLOW_GALLERY = "https://labs.google/fx/pt/tools/flow"
ASPECT_RATIOS = {
    "portrait": "IMAGE_ASPECT_RATIO_PORTRAIT",
    "landscape": "IMAGE_ASPECT_RATIO_LANDSCAPE",
    "square": "IMAGE_ASPECT_RATIO_SQUARE",
}

# Persistent profile directory — cookies, localStorage, and settings survive restarts
_PROFILE_DIR = Path(__file__).resolve().parent.parent / "data" / "camoufox_profile"

# JS fetch interceptor — captures aisandbox POST requests,
# overrides imageAspectRatio via window.__flowAspectRatio, and
# stores the batchGenerateImages response for Python to read.
_FETCH_INTERCEPTOR = """
(() => {
    if (window.__flowInterceptorReady) return;
    window.__flowCaptured = [];
    window.__flowLastGeneration = null;
    window.__flowAspectRatio = null;
    window.__flowReferenceMediaId = null;
    window.__flowReferenceWeight = 0.85;
    window.__flowLastMediaId = null;
    window.__flowPendingSubmitId = null;
    window.__flowResponseQueue = [];
    const origFetch = window.fetch;
    window.fetch = async function(...args) {
        const [url, opts] = args;
        const urlStr = typeof url === 'string' ? url : url?.url || '';
        let capturedSubmitId = null;
        if (urlStr.includes('aisandbox-pa') && opts?.method?.toUpperCase() === 'POST') {
            capturedSubmitId = window.__flowPendingSubmitId;
            window.__flowPendingSubmitId = null;  // consumed — prevents next submit from reusing
            const entry = {url: urlStr, headers: {}, body: null, ts: Date.now(), submitId: capturedSubmitId};
            if (opts.headers) {
                if (opts.headers instanceof Headers)
                    opts.headers.forEach((v, k) => { entry.headers[k] = v; });
                else if (typeof opts.headers === 'object')
                    entry.headers = {...opts.headers};
            }
            if (urlStr.includes('batchGenerateImages') && opts.body) {
                try {
                    const body = JSON.parse(opts.body);
                    if (body.requests && body.requests.length > 0) {
                        // Take the LAST request (newest prompt), not first — Flow may accumulate requests
                        const lastIdx = body.requests.length - 1;
                        body.requests = [body.requests[lastIdx]];
                        if (window.__flowAspectRatio) {
                            body.requests[0].imageAspectRatio = window.__flowAspectRatio;
                        }
                        if (window.__flowReferenceMediaId) {
                            body.requests[0].imageInputs = [{
                                mediaId: window.__flowReferenceMediaId,
                                weight: window.__flowReferenceWeight || 0.85
                            }];
                        } else {
                            body.requests[0].imageInputs = [];
                        }
                        const promptText = body.requests[0]?.prompt || body.requests[0]?.text || 'NO_PROMPT_FOUND';
                        console.log('[FlowInterceptor] submitId=' + capturedSubmitId + ' prompt (' + promptText.length + ' chars):', promptText.substring(0, 200));
                        console.log('[FlowInterceptor] requests array had ' + (lastIdx + 1) + ' items, took index ' + lastIdx);
                        window.__flowLastPromptSent = promptText;
                    }
                    opts.body = JSON.stringify(body);
                } catch (e) {
                    console.error('[FlowInterceptor] Error parsing body:', e);
                }
            }
            if (opts.body) {
                try { entry.body = JSON.parse(opts.body); } catch {}
            }
            window.__flowCaptured.push(entry);
        }
        const response = await origFetch.apply(this, args);
        if (urlStr.includes('batchGenerateImages')) {
            try {
                const clone = response.clone();
                const data = await clone.json();
                console.log('[FlowInterceptor] Response received for submitId=' + capturedSubmitId + ', keys:', Object.keys(data).join(','));
                window.__flowLastGeneration = {url: urlStr, response: data, submitId: capturedSubmitId, ts: Date.now()};
                window.__flowResponseQueue.push({submitId: capturedSubmitId, data: data, ts: Date.now()});
                try {
                    const mediaId = data?.media?.[0]?.image?.generatedImage?.mediaGenerationId
                        || data?.imagePanels?.[0]?.generatedImages?.[0]?.mediaGenerationId;
                    if (mediaId) window.__flowLastMediaId = mediaId;
                } catch {}
            } catch (respErr) {
                console.error('[FlowInterceptor] Failed to capture response for submitId=' + capturedSubmitId + ':', respErr);
            }
        }
        return response;
    };
    window.__flowInterceptorReady = true;
})();
"""

FLOW_MAX_PROMPT_LENGTH = 1200


def _condense_prompt_for_flow(prompt: str, max_length: int = FLOW_MAX_PROMPT_LENGTH) -> str:
    """Condense a long prompt to fit Flow's optimal length."""
    if len(prompt) <= max_length:
        return prompt

    logger.warning(
        "FlowCamoufoxEngine: Prompt too long (%d chars), condensing to %d",
        len(prompt), max_length
    )

    sections = prompt.split('\n\n')
    if len(sections) == 1:
        sections = prompt.split('. ')

    priority_keywords = [
        'skeleton', 'skull', 'transparent', 'bone', 'yellow iris',
        'wearing', 'environment', 'cyclorama', 'camera', 'lighting',
        'photorealistic'
    ]

    condensed_parts = []
    current_length = 0

    for section in sections:
        section = section.strip()
        if not section:
            continue

        has_priority = any(kw.lower() in section.lower() for kw in priority_keywords)

        if has_priority or current_length < max_length // 2:
            if current_length + len(section) + 2 <= max_length:
                condensed_parts.append(section)
                current_length += len(section) + 2
            elif current_length < max_length - 100:
                remaining = max_length - current_length - 50
                if remaining > 100:
                    condensed_parts.append(section[:remaining] + "...")
                break

    result = '. '.join(condensed_parts) if len(sections) > 1 else '\n\n'.join(condensed_parts)

    if 'photorealistic' in prompt.lower() and 'photorealistic' not in result.lower():
        result = result.rstrip('.') + ". Photorealistic cinematic realism."

    logger.info("FlowCamoufoxEngine: Condensed prompt to %d chars", len(result))
    return result


class FlowCamoufoxEngine:
    """
    Anti-detect browser engine for Flow image generation with Nano Banana Pro.

    Uses Camoufox for maximum stealth against reCAPTCHA Enterprise.

    Usage (async):
        engine = FlowCamoufoxEngine(session_token, csrf_token)
        await engine.start()
        path = await engine.generate("a cat", Path("out.png"))
        await engine.stop()
    """

    def __init__(
        self,
        session_token: str,
        csrf_token: str,
        use_virtual_display: bool = True,
        cookies: list[dict[str, str]] | None = None,
    ):
        """
        Initialize Flow Camoufox Engine.
        
        Args:
            session_token: Google Flow session token
            csrf_token: Google Flow CSRF token
            use_virtual_display: Use virtual display for headless (more stealth)
            cookies: Full list of cookies from GOOGLE_FLOW_COOKIES JSON (all injected)
        """
        self.session_token = session_token
        self.csrf_token = csrf_token
        self.use_virtual_display = use_virtual_display
        self._extra_cookies = cookies or []
        self._playwright_instance = None  # Playwright instance
        self._browser: Any = None  # BrowserContext (persistent_context)
        self._page = None
        self._ready = False
        self._project_id: Optional[str] = None
        self._http_client: httpx.AsyncClient | None = None
        self._last_media_id: str | None = None
        self._current_aspect_ratio: str | None = None
        self._aspect_ratio_configured: bool = False
        
        # Response capture — event-driven FIFO:
        #   _on_response_handler (Playwright network) pushes data into _pending_responses
        #   Each submit registers a Future in _pending_responses (FIFO order)
        #   Handler resolves the OLDEST pending Future → instant, zero polling
        self._loop: asyncio.AbstractEventLoop | None = None
        self._last_captured_response: dict | None = None
        self._response_queue: list[dict] = []  # legacy compat
        self._pending_responses: list[asyncio.Future] = []  # FIFO: oldest submit first
        
        # Parallel generation system (up to 10 simultaneous)
        self._submit_counter: int = 0
        self._active_slots: int = 0
        self._max_slots: int = 10
        self._submission_lock = asyncio.Lock()
        self._sequential_lock = asyncio.Lock()  # Serializes generate_sequential() calls
        self._slot_semaphore: asyncio.Semaphore | None = None
        self._fully_configured: bool = False
        self._sequential_mode: bool = False  # when True, _on_response_handler skips response.json() to avoid IPC deadlock

    @property
    def is_running(self) -> bool:
        """Check if engine is running and browser is still valid."""
        if not self._ready or not self._browser or not self._page:
            return False
        # Try to check if page is still connected
        try:
            # This will fail if the page is closed
            if self._page.is_closed():
                self._ready = False
                return False
            return True
        except Exception:
            self._ready = False
            return False

    @property
    def project_id(self) -> Optional[str]:
        return self._project_id

    @property
    def last_media_id(self) -> str | None:
        """The mediaGenerationId from the last successful image generation."""
        return self._last_media_id

    async def start(self, timeout_s: int = 60) -> bool:
        """Launch Playwright Firefox browser, navigate to Flow, enter a project in image mode."""
        if self._ready:
            return True

        try:
            from playwright.async_api import async_playwright
        except ImportError:
            raise ImportError(
                "Playwright not installed. Install with: pip install playwright && python -m playwright install firefox"
            )

        try:
            # Persistent context keeps cookies/localStorage/settings between restarts
            _PROFILE_DIR.mkdir(parents=True, exist_ok=True)

            logger.info("FlowCamoufoxEngine: Starting Playwright Firefox (PERSISTENT PROFILE at %s)...", _PROFILE_DIR)
            self._loop = asyncio.get_running_loop()
            self._playwright_instance = await async_playwright().start()
            self._browser = await self._playwright_instance.firefox.launch_persistent_context(
                user_data_dir=str(_PROFILE_DIR),
                headless=False,
                viewport={"width": 1280, "height": 900},
                locale="pt-BR",
                user_agent="Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0",
                firefox_user_prefs={
                    "dom.events.asyncClipboard.readText": True,
                    "dom.events.testing.asyncClipboard": True,
                    "dom.events.asyncClipboard.clipboardItem": True,
                },
            )

            # Reuse existing page from persistent context, or create new one
            if self._browser.pages:
                self._page = self._browser.pages[0]
                logger.info("FlowCamoufoxEngine: Reusing existing page from persistent profile")
            else:
                self._page = await self._browser.new_page()
                logger.info("FlowCamoufoxEngine: Created new page (first launch)")
            
            # Register response handler to capture API responses immediately
            self._page.on("response", self._on_response_handler)

            # Add auth cookies — inject ALL cookies if full list available
            browser_cookies: list[Any] = []
            
            if self._extra_cookies:
                # Full cookie list from GOOGLE_FLOW_COOKIES JSON — inject everything
                seen_names: set[str] = set()
                for cookie in self._extra_cookies:
                    name = cookie.get("name", "")
                    value = cookie.get("value", "")
                    if not name or not value:
                        continue
                    seen_names.add(name)
                    browser_cookies.append({
                        "name": name,
                        "value": value,
                        "domain": "labs.google",
                        "path": "/",
                        "secure": True,
                        "httpOnly": name.startswith("__"),
                        "sameSite": "Lax",
                    })
                # Ensure session + csrf from constructor override stale JSON values
                for existing in browser_cookies:
                    if existing["name"] == "__Secure-next-auth.session-token":
                        existing["value"] = self.session_token
                    elif existing["name"] == "__Host-next-auth.csrf-token":
                        existing["value"] = self.csrf_token
                # Ensure callback-url is always present with CORRECT encoding
                if "__Secure-next-auth.callback-url" not in seen_names:
                    browser_cookies.append({
                        "name": "__Secure-next-auth.callback-url",
                        "value": "https%3A%2F%2Flabs.google%2Ffx%2Fpt%2Ftools%2Fflow",
                        "domain": "labs.google",
                        "path": "/",
                        "secure": True,
                        "httpOnly": True,
                        "sameSite": "Lax",
                    })
                logger.info("FlowCamoufoxEngine: Injecting %d cookies from full cookie list", len(browser_cookies))
            else:
                # Fallback: construct essential cookies from individual tokens
                browser_cookies = [
                    {
                        "name": "__Secure-next-auth.session-token",
                        "value": self.session_token,
                        "domain": "labs.google",
                        "path": "/",
                        "secure": True,
                        "httpOnly": True,
                        "sameSite": "Lax",
                    },
                    {
                        "name": "__Host-next-auth.csrf-token",
                        "value": self.csrf_token,
                        "domain": "labs.google",
                        "path": "/",
                        "secure": True,
                        "httpOnly": True,
                        "sameSite": "Lax",
                    },
                    {
                        "name": "__Secure-next-auth.callback-url",
                        "value": "https%3A%2F%2Flabs.google%2Ffx%2Fpt%2Ftools%2Fflow",
                        "domain": "labs.google",
                        "path": "/",
                        "secure": True,
                        "httpOnly": True,
                        "sameSite": "Lax",
                    },
                ]
                logger.info("FlowCamoufoxEngine: Injecting 3 essential cookies from individual tokens")
            
            await self._page.context.add_cookies(browser_cookies)

            # Navigate to Flow gallery
            logger.info("FlowCamoufoxEngine: Loading gallery %s", FLOW_GALLERY)
            await self._page.goto(
                FLOW_GALLERY, wait_until="domcontentloaded", timeout=timeout_s * 1000
            )
            await asyncio.sleep(8)

            # Enter a project + switch to image mode
            entered = await self._enter_project()
            if not entered:
                raise RuntimeError("Could not enter any Flow project")

            # Verify prompt input exists
            prompt_input = self._page.locator('[role="textbox"]')
            if not (await prompt_input.count() > 0 and await prompt_input.first.is_visible(timeout=5000)):
                prompt_input = self._page.locator("#PINHOLE_TEXT_AREA_ELEMENT_ID")
                if not (await prompt_input.count() > 0 and await prompt_input.first.is_visible(timeout=5000)):
                    raise RuntimeError(
                        f"Prompt input not found after setup. URL={self._page.url}"
                    )

            # Install fetch interceptor
            await self._page.evaluate(_FETCH_INTERCEPTOR)

            self._http_client = httpx.AsyncClient(timeout=60.0, follow_redirects=True)
            self._ready = True
            logger.info("FlowCamoufoxEngine: Ready — project=%s", self._project_id)
            return True

        except Exception as e:
            logger.error("FlowCamoufoxEngine: Start failed — %s", e)
            await self.stop()
            return False

    async def _enter_project(self) -> bool:
        """Enter a Flow project using robust selectors, with retry."""

        for attempt in range(3):
            logger.info(
                "FlowCamoufoxEngine: Entering project (attempt %d)…", attempt + 1
            )

            # Find <a> links to /project/ on the main Flow page
            clicked = await self._page.evaluate("""
                () => {
                    const links = document.querySelectorAll('a[href*="/project/"]');
                    if (links.length > 0) {
                        links[0].click();
                        return 'link';
                    }
                    return null;
                }
            """)

            if clicked:
                logger.info("FlowCamoufoxEngine: Clicked existing project via %s", clicked)
                await asyncio.sleep(8)
            else:
                # No existing projects - click "Novo projeto" button
                logger.info(
                    "FlowCamoufoxEngine: No project links found, clicking 'Novo projeto'…"
                )
                new_project_btn = await self._page.evaluate("""
                    () => {
                        const btns = document.querySelectorAll('button');
                        for (const btn of btns) {
                            if (btn.textContent.includes('Novo projeto')) {
                                btn.click();
                                return true;
                            }
                        }
                        return false;
                    }
                """)
                if new_project_btn:
                    logger.info("FlowCamoufoxEngine: Clicked 'Novo projeto' button")
                    await asyncio.sleep(8)
                else:
                    logger.warning("FlowCamoufoxEngine: 'Novo projeto' button not found")

            # Verify we entered a project
            url = self._page.url
            if "/project/" in url:
                self._project_id = (
                    url.split("/project/")[-1].split("?")[0].split("#")[0]
                )
                logger.info("FlowCamoufoxEngine: Entered project %s", self._project_id)

                await asyncio.sleep(3)
                
                # Step 6-8: Configure settings immediately after entering project
                logger.info("FlowCamoufoxEngine: Configuring initial settings (Nano Banana Pro, x1, Retrato)...")
                await self._configure_generation_settings("IMAGE_ASPECT_RATIO_PORTRAIT")
                self._current_aspect_ratio = "IMAGE_ASPECT_RATIO_PORTRAIT"
                self._aspect_ratio_configured = True
                self._fully_configured = True
                
                # Initialize semaphore for slot control
                self._slot_semaphore = asyncio.Semaphore(self._max_slots)
                
                logger.info("FlowCamoufoxEngine: Browser fully configured and ready!")
                return True
            else:
                logger.warning(
                    "FlowCamoufoxEngine: Not in project (%s), retrying…", url
                )
                if attempt < 2:
                    await self._page.goto(
                        FLOW_GALLERY, wait_until="domcontentloaded", timeout=30000
                    )
                    await asyncio.sleep(8)

        return False

    async def _configure_generation_settings(self, aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT") -> None:
        """Configure generation settings: Retrato, x1, Nano Banana Pro.
        
        Tested patterns (March 2026):
          - Popover trigger: Playwright dispatch_event('pointerdown'+'pointerup'+'click')
          - Radix tabs: page.mouse.click(x, y) at bounding box center
          - Menu items: Playwright dispatch_event('pointerdown'+'pointerup'+'click')
          - API fallback: window.__flowAspectRatio via fetch interceptor
        """
        logger.info("FlowCamoufoxEngine: Configuring settings (Retrato, x1, Nano Banana Pro)...")
        
        async def _dispatch_click(locator) -> bool:
            """Send trusted pointerdown+pointerup+click via Playwright dispatch_event."""
            try:
                await locator.dispatch_event('pointerdown')
                await asyncio.sleep(0.05)
                await locator.dispatch_event('pointerup')
                await asyncio.sleep(0.05)
                await locator.dispatch_event('click')
                return True
            except Exception as e:
                logger.warning("FlowCamoufoxEngine: dispatch_click failed: %s", e)
                return False
        
        async def _mouse_click_locator(locator) -> bool:
            """Click at locator's bounding box center via page.mouse.click (real mouse event)."""
            try:
                bbox = await locator.bounding_box()
                if not bbox:
                    return False
                cx = bbox['x'] + bbox['width'] / 2
                cy = bbox['y'] + bbox['height'] / 2
                await self._page.mouse.click(cx, cy)
                return True
            except Exception as e:
                logger.warning("FlowCamoufoxEngine: mouse_click failed: %s", e)
                return False
        
        try:
            # ── API-level fallback: set aspect ratio on fetch interceptor ──
            await self._page.evaluate(
                "(ar) => { window.__flowAspectRatio = ar; }",
                aspect_ratio,
            )
            logger.info("FlowCamoufoxEngine: Set window.__flowAspectRatio = %s", aspect_ratio)
            
            # ── Step 1: Open the settings popover ──
            # Pattern: dispatch_event on button[aria-haspopup='menu'] with 'Nano Banana'
            logger.info("FlowCamoufoxEngine: Step 1 \u2014 Opening settings popover")
            trigger = self._page.locator('button[aria-haspopup="menu"]').filter(has_text='Nano Banana').first
            if await trigger.count() > 0:
                await _dispatch_click(trigger)
                await asyncio.sleep(2.0)  # Wait for Radix popover animation
            else:
                logger.error("FlowCamoufoxEngine: Settings trigger button not found")
                return
            
            # Verify popover opened
            popover_visible = await self._page.evaluate("""
                () => {
                    const tabs = document.querySelectorAll('button[role="tab"]');
                    let visible = 0;
                    for (const t of tabs) { if (t.offsetParent) visible++; }
                    return visible >= 4;
                }
            """)
            if not popover_visible:
                logger.warning("FlowCamoufoxEngine: Popover may not have opened, retrying...")
                await _dispatch_click(trigger)
                await asyncio.sleep(2.0)
            
            logger.info("FlowCamoufoxEngine: Settings popover opened")
            
            # ── Step 2: Click Retrato tab ──
            # Pattern: page.mouse.click(x, y) — Radix tabs need real mouse events
            logger.info("FlowCamoufoxEngine: Step 2 \u2014 Selecting Retrato")
            retrato = self._page.locator('[id$="-trigger-PORTRAIT"]').first
            if await retrato.count() > 0:
                before = await retrato.get_attribute('data-state')
                if before != 'active':
                    await _mouse_click_locator(retrato)
                    await asyncio.sleep(1.0)
                    after = await retrato.get_attribute('data-state')
                    logger.info("FlowCamoufoxEngine: Retrato %s \u2192 %s", before, after)
                else:
                    logger.info("FlowCamoufoxEngine: Retrato already active")
            else:
                # Fallback: text-based locator
                retrato = self._page.locator('button[role="tab"]').filter(has_text='Retrato').first
                if await retrato.count() > 0:
                    await _mouse_click_locator(retrato)
                    await asyncio.sleep(1.0)
                    logger.info("FlowCamoufoxEngine: Retrato clicked via text locator")
                else:
                    logger.warning("FlowCamoufoxEngine: Retrato tab not found")
            
            # ── Step 3: Click x1 tab ──
            logger.info("FlowCamoufoxEngine: Step 3 \u2014 Selecting x1")
            x1 = self._page.locator('[id$="-trigger-1"]').first
            if await x1.count() > 0:
                before = await x1.get_attribute('data-state')
                if before != 'active':
                    await _mouse_click_locator(x1)
                    await asyncio.sleep(1.0)
                    after = await x1.get_attribute('data-state')
                    logger.info("FlowCamoufoxEngine: x1 %s \u2192 %s", before, after)
                else:
                    logger.info("FlowCamoufoxEngine: x1 already active")
            else:
                x1 = self._page.locator('button[role="tab"]').filter(has_text='x1').first
                if await x1.count() > 0:
                    await _mouse_click_locator(x1)
                    await asyncio.sleep(1.0)
                    logger.info("FlowCamoufoxEngine: x1 clicked via text locator")
                else:
                    logger.warning("FlowCamoufoxEngine: x1 tab not found")
            
            # ── Step 4: Open model dropdown ──
            # Pattern: dispatch_event on the model button INSIDE the popover
            logger.info("FlowCamoufoxEngine: Step 4 \u2014 Opening model dropdown")
            model_btn = self._page.locator('[data-radix-popper-content-wrapper] button[aria-haspopup="menu"]').first
            if await model_btn.count() == 0:
                model_btn = self._page.locator('[data-radix-popper-content-wrapper] button').filter(has_text='Nano Banana').first
            
            if await model_btn.count() > 0:
                await _dispatch_click(model_btn)
                await asyncio.sleep(2.0)  # Wait for model dropdown animation
                
                # ── Step 5: Select Nano Banana Pro ──
                logger.info("FlowCamoufoxEngine: Step 5 \u2014 Selecting Nano Banana Pro")
                pro = self._page.locator('[role="menuitem"]').filter(has_text='Pro').first
                if await pro.count() == 0:
                    pro = self._page.locator('[role="menuitemradio"]').filter(has_text='Pro').first
                
                if await pro.count() > 0:
                    await _dispatch_click(pro)
                    await asyncio.sleep(1.0)
                    logger.info("FlowCamoufoxEngine: \u2713 Nano Banana Pro selected")
                else:
                    logger.warning("FlowCamoufoxEngine: Nano Banana Pro not found in dropdown")
                    # Debug: log visible menu items
                    items = await self._page.evaluate("""
                        () => {
                            const r = [];
                            for (const el of document.querySelectorAll('[role="menuitem"], [role="menuitemradio"]')) {
                                if (el.offsetParent) r.push((el.textContent||'').trim().substring(0,40));
                            }
                            return r;
                        }
                    """)
                    logger.warning("FlowCamoufoxEngine: Available items: %s", items)
            else:
                logger.warning("FlowCamoufoxEngine: Model dropdown button not found in popover")
            
            await asyncio.sleep(0.5)
            
            # ── Step 6: Close all popups ──
            await self._page.keyboard.press('Escape')
            await asyncio.sleep(0.3)
            await self._page.keyboard.press('Escape')
            await asyncio.sleep(0.3)
            await self._page.keyboard.press('Escape')
            await asyncio.sleep(0.3)
            
            # ── Step 7: Verify ──
            status = await self._page.evaluate("""
                () => {
                    for (const el of document.querySelectorAll('button, div, span')) {
                        if (!el.offsetParent) continue;
                        const text = (el.textContent || '').trim();
                        const rect = el.getBoundingClientRect();
                        if (text.includes('Nano Banana') && text.includes('x') && rect.y > 400) {
                            return text;
                        }
                    }
                    return '';
                }
            """)
            if status:
                logger.info("FlowCamoufoxEngine: Status bar: %s", status[:120])
                if 'Pro' in status and 'x1' in status:
                    logger.info("FlowCamoufoxEngine: \u2713 All settings verified!")
                elif 'Pro' in status:
                    logger.info("FlowCamoufoxEngine: \u2713 Model=Pro (aspect/count via interceptor)")
                else:
                    logger.warning("FlowCamoufoxEngine: Settings may be incomplete: %s", status[:120])
            
            logger.info("FlowCamoufoxEngine: Configuration complete!")
            
        except Exception as e:
            logger.warning("FlowCamoufoxEngine: Configuration error: %s", e)
            try:
                await self._page.keyboard.press('Escape')
                await asyncio.sleep(0.1)
                await self._page.keyboard.press('Escape')
            except Exception:
                pass
    async def _configure_generation_settings_full(self, aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT") -> None:
        """Full configuration with Nano Banana Pro and x1 selection (optional).
        
        This is a more thorough configuration that clicks through the UI.
        """
        logger.info("FlowCamoufoxEngine: Starting full configuration...")
        
        try:
            # Step 1: Select aspect ratio first
            logger.info("FlowCamoufoxEngine: Step 1 - Selecting aspect ratio")
            await self._select_aspect_ratio_ui(aspect_ratio)
            await asyncio.sleep(0.5)
            
            # Step 2: Check current model configuration
            logger.info("FlowCamoufoxEngine: Step 2 - Checking current model")
            current_model = await self._page.evaluate("""
                () => {
                    const btns = document.querySelectorAll('button');
                    for (const btn of btns) {
                        const text = btn.textContent || '';
                        if (text.includes('Banana') && btn.offsetParent !== null) {
                            return text.trim();
                        }
                    }
                    return '';
                }
            """)
            logger.info("FlowCamoufoxEngine: Current model button text: %s", current_model[:100] if current_model else "NOT FOUND")

            # Step 3: Check if already configured
            if 'Pro' in current_model and 'x1' in current_model:
                logger.info("FlowCamoufoxEngine: Step 3 - Already configured to Nano Banana Pro x1, done!")
                return
            
            # Step 4: Find and click the Nano Banana settings button
            logger.info("FlowCamoufoxEngine: Step 4 - Opening settings dropdown")
            
            # Step 5: Try multiple selectors for the settings button
            settings_clicked = False
            
            # Method A: Playwright locator
            try:
                main_btn = self._page.locator('button:has-text("Nano Banana")').first
                if await main_btn.count() > 0 and await main_btn.is_visible(timeout=2000):
                    await main_btn.click(force=True)
                    settings_clicked = True
                    logger.info("FlowCamoufoxEngine: Step 5A - Clicked via Playwright locator")
            except Exception as e:
                logger.debug("FlowCamoufoxEngine: Step 5A failed: %s", e)
            
            # Method B: JavaScript click
            if not settings_clicked:
                logger.info("FlowCamoufoxEngine: Step 5B - Trying JS click")
                settings_clicked = await self._page.evaluate("""
                    () => {
                        const btns = document.querySelectorAll('button');
                        for (const btn of btns) {
                            const text = btn.textContent || '';
                            if (text.includes('Nano Banana') && btn.offsetParent !== null) {
                                btn.click();
                                return true;
                            }
                        }
                        return false;
                    }
                """)
                if settings_clicked:
                    logger.info("FlowCamoufoxEngine: Step 5B - Clicked via JS")
            
            if not settings_clicked:
                logger.warning("FlowCamoufoxEngine: Could not open settings dropdown - using defaults")
                return
            
            # Step 6: Wait for dropdown to open
            await asyncio.sleep(1.0)
            
            # Step 7: Select x1 (1 image)
            logger.info("FlowCamoufoxEngine: Step 7 - Selecting x1")
            x1_clicked = await self._page.evaluate("""
                () => {
                    const btns = document.querySelectorAll('button');
                    for (const btn of btns) {
                        const text = btn.textContent || '';
                        // Look for x1 button (not x2, x4)
                        if (text.trim() === 'x1' && btn.offsetParent !== null) {
                            btn.click();
                            return true;
                        }
                    }
                    return false;
                }
            """)
            if x1_clicked:
                logger.info("FlowCamoufoxEngine: Step 7 - Selected x1")
            else:
                logger.warning("FlowCamoufoxEngine: Step 7 - x1 button not found")
            await asyncio.sleep(0.5)
            
            # Step 8: Find the model dropdown (arrow_drop_down icon)
            logger.info("FlowCamoufoxEngine: Step 8 - Opening model dropdown")
            model_dropdown_clicked = await self._page.evaluate("""
                () => {
                    const btns = document.querySelectorAll('button');
                    for (const btn of btns) {
                        const text = btn.textContent || '';
                        // Look for dropdown with arrow icon inside the settings panel
                        if (text.includes('arrow_drop_down') && btn.offsetParent !== null) {
                            const rect = btn.getBoundingClientRect();
                            // Should be in the settings area (not too high on page)
                            if (rect.y > 400) {
                                btn.click();
                                return true;
                            }
                        }
                    }
                    return false;
                }
            """)
            if model_dropdown_clicked:
                logger.info("FlowCamoufoxEngine: Step 8 - Opened model dropdown")
                await asyncio.sleep(0.8)
            else:
                logger.warning("FlowCamoufoxEngine: Step 8 - Model dropdown not found")
            
            # Step 9: Select Nano Banana Pro
            logger.info("FlowCamoufoxEngine: Step 9 - Selecting Nano Banana Pro")
            pro_clicked = await self._page.evaluate("""
                () => {
                    // Look for menu items or buttons with "Pro"
                    const elements = document.querySelectorAll('button, [role="menuitem"], [role="option"]');
                    for (const el of elements) {
                        const text = el.textContent || '';
                        if (text.includes('Nano Banana Pro') && el.offsetParent !== null) {
                            el.click();
                            return true;
                        }
                    }
                    // Also try looking for just "Pro" in dropdown context
                    const btns = document.querySelectorAll('button');
                    for (const btn of btns) {
                        const text = btn.textContent || '';
                        if (text.includes('Pro') && text.includes('Banana') && btn.offsetParent !== null) {
                            btn.click();
                            return true;
                        }
                    }
                    return false;
                }
            """)
            if pro_clicked:
                logger.info("FlowCamoufoxEngine: Step 9 - Selected Nano Banana Pro")
            else:
                logger.warning("FlowCamoufoxEngine: Step 9 - Nano Banana Pro not found")
            await asyncio.sleep(0.5)
            
            # Step 10: Close any open dropdowns
            logger.info("FlowCamoufoxEngine: Step 10 - Closing dropdowns")
            await self._page.keyboard.press('Escape')
            await asyncio.sleep(0.3)
            
            # Step 11: Verify configuration
            logger.info("FlowCamoufoxEngine: Step 11 - Verifying configuration")
            final_model = await self._page.evaluate("""
                () => {
                    const btns = document.querySelectorAll('button');
                    for (const btn of btns) {
                        const text = btn.textContent || '';
                        if (text.includes('Banana') && btn.offsetParent !== null) {
                            return text.trim();
                        }
                    }
                    return '';
                }
            """)
            logger.info("FlowCamoufoxEngine: Configuration complete. Model: %s", final_model[:100] if final_model else "UNKNOWN")

        except Exception as e:
            logger.warning("FlowCamoufoxEngine: Could not configure settings: %s", e)
            # Try to close any open dropdowns
            try:
                await self._page.keyboard.press('Escape')
            except:
                pass

    async def _select_aspect_ratio_ui(self, aspect_ratio: str) -> None:
        """Select aspect ratio in the Flow UI (Portrait/Landscape/Square).
        
        Flow UI uses Portuguese labels: Retrato (Portrait), Paisagem (Landscape)
        The aspect ratio buttons are inside the settings dropdown (Nano Banana button).
        """
        logger.info("FlowCamoufoxEngine: _select_aspect_ratio_ui called for %s", aspect_ratio)
        
        # Check if we're on the right page (project page with prompt input)
        try:
            prompt_input = self._page.locator('[role="textbox"]')
            if not (await prompt_input.count() > 0):
                logger.warning("FlowCamoufoxEngine: Not on project page, skipping aspect ratio selection")
                return
        except Exception:
            logger.warning("FlowCamoufoxEngine: Could not verify page state, skipping aspect ratio selection")
            return
        
        # Map API aspect ratio to Portuguese button text
        aspect_text_map = {
            "IMAGE_ASPECT_RATIO_PORTRAIT": "Retrato",
            "IMAGE_ASPECT_RATIO_LANDSCAPE": "Paisagem",
            "IMAGE_ASPECT_RATIO_SQUARE": "Quadrado",
        }
        target_text = aspect_text_map.get(aspect_ratio, "Retrato")
        
        try:
            # Step 1: Open the settings dropdown by clicking Nano Banana button
            logger.info("FlowCamoufoxEngine: Step 1 - Opening Nano Banana dropdown...")
            
            # Try multiple methods to click the Nano Banana button
            clicked_aspect = False
            
            # Method 1: Playwright locator with force
            try:
                nano_btn = self._page.locator('button:has-text("Nano Banana")').first
                await nano_btn.click(force=True, timeout=3000)
                clicked_aspect = True
                logger.info("FlowCamoufoxEngine: Clicked Nano Banana via Playwright locator")
            except Exception as e1:
                logger.warning("FlowCamoufoxEngine: Playwright click failed: %s", e1)
            
            # Method 2: JavaScript click with dispatchEvent
            if not clicked_aspect:
                try:
                    clicked_aspect = await self._page.evaluate("""
                        () => {
                            const btns = document.querySelectorAll('button');
                            for (const btn of btns) {
                                const text = btn.textContent || '';
                                if (text.includes('Nano Banana') || text.includes('🍌')) {
                                    // Try multiple click methods
                                    btn.click();
                                    btn.dispatchEvent(new MouseEvent('click', {bubbles: true, cancelable: true}));
                                    return true;
                                }
                            }
                            return false;
                        }
                    """)
                    if clicked_aspect:
                        logger.info("FlowCamoufoxEngine: Clicked Nano Banana via JS dispatchEvent")
                except Exception as e2:
                    logger.warning("FlowCamoufoxEngine: JS click failed: %s", e2)
            
            # Method 3: Try clicking by aria-label or other attributes
            if not clicked_aspect:
                try:
                    # Try with crop_portrait icon text
                    icon_map = {"Retrato": "crop_portrait", "Paisagem": "crop_landscape", "Quadrado": "crop_square"}
                    icon_text = icon_map.get(target_text, "crop_portrait")
                    
                    result = await self._page.evaluate("""
                        (iconText) => {
                            const elements = document.querySelectorAll('*');
                            for (const el of elements) {
                                if (!el.offsetParent) continue;
                                const text = (el.textContent || '').trim();
                                if (text === iconText) {
                                    const btn = el.closest('button') || el;
                                    btn.click();
                                    return 'clicked';
                                }
                            }
                            return 'not_found';
                        }
                    """, icon_text)
                    if result == 'clicked':
                        clicked_aspect = True
                        logger.info("FlowCamoufoxEngine: Clicked %s via icon text", target_text)
                except Exception as e3:
                    logger.warning("FlowCamoufoxEngine: Icon click failed: %s", e3)
            
            if clicked_aspect:
                logger.info("FlowCamoufoxEngine: Successfully selected aspect ratio %s", aspect_ratio)
                await asyncio.sleep(0.5)
            else:
                logger.error("FlowCamoufoxEngine: Failed to click %s with all methods", target_text)
            
            # Close dropdown by pressing Escape
            await self._page.keyboard.press('Escape')
            await asyncio.sleep(0.3)
            
        except Exception as e:
            logger.error("FlowCamoufoxEngine: _select_aspect_ratio_ui error: %s", e)
            # Try to close any open dropdown
            try:
                await self._page.keyboard.press('Escape')
            except:
                pass

    async def _on_response_handler(self, response) -> None:
        """Event-driven response capture.
        
        In SEQUENTIAL mode (_sequential_mode=True): SKIP response.json() entirely.
        The JS fetch interceptor captures data in-browser, and _poll_js_response()
        reads it via page.evaluate(). Calling response.json() here would block
        Playwright's IPC channel (Firefox buffers large responses), deadlocking
        all page.evaluate() calls.
        
        In PARALLEL mode (_sequential_mode=False): resolves the OLDEST pending Future.
        """
        url = response.url
        
        if "googleapis.com" in url or "aisandbox" in url:
            logger.debug("FlowCamoufoxEngine: [NET] %s %s (status=%d)",
                        response.request.method, url[:150], response.status)
        
        if "batchGenerateImages" not in url:
            return
        
        logger.info("FlowCamoufoxEngine: [RESPONSE] batchGenerateImages (status=%d, sequential=%s)",
                   response.status, self._sequential_mode)
        
        # In sequential mode, DO NOT call response.json() — it blocks IPC
        if self._sequential_mode:
            logger.info("FlowCamoufoxEngine: [RESPONSE] Sequential mode — skipping response.json() (JS interceptor handles capture)")
            return
        
        try:
            if response.status != 200:
                logger.warning("FlowCamoufoxEngine: [RESPONSE] non-200: status=%d", response.status)
                return
            
            data = await response.json()
            logger.info("FlowCamoufoxEngine: [RESPONSE] Captured! keys=%s", list(data.keys()))
            
            self._last_captured_response = data
            
            # Resolve the OLDEST pending future (FIFO order = submission order)
            while self._pending_responses:
                fut = self._pending_responses.pop(0)
                if not fut.done():
                    fut.set_result(data)
                    logger.info("FlowCamoufoxEngine: [RESPONSE] ✓ Resolved pending future (remaining=%d)",
                               len(self._pending_responses))
                    break
            else:
                # No pending future — store for later pickup
                self._response_queue.append(data)
                logger.warning("FlowCamoufoxEngine: [RESPONSE] No pending future, queued (queue=%d)",
                              len(self._response_queue))
            
            # Extract media ID
            media_list = data.get("media", [])
            panels = data.get("imagePanels", [])
            if media_list:
                media_id = (
                    media_list[0].get("image", {})
                    .get("generatedImage", {})
                    .get("mediaGenerationId")
                )
                if media_id:
                    self._last_media_id = media_id
                    logger.info("FlowCamoufoxEngine: [LAYER2] media_id=%s", media_id[:30])
            elif panels:
                images = panels[0].get("generatedImages", [])
                if images:
                    media_id = images[0].get("mediaGenerationId")
                    if media_id:
                        self._last_media_id = media_id
                        logger.info("FlowCamoufoxEngine: [LAYER2] media_id (panel)=%s", media_id[:30])
        except Exception as e:
            logger.warning("FlowCamoufoxEngine: [LAYER2] Failed to capture response: %s", e)

    async def stop(self):
        """Shutdown browser."""
        self._ready = False
        try:
            if self._http_client:
                await self._http_client.aclose()
        except Exception:
            pass
        self._http_client = None
        try:
            if self._browser:
                await self._browser.close()
        except Exception:
            pass
        self._browser = self._page = None
        try:
            if hasattr(self, '_playwright_instance') and self._playwright_instance:
                await self._playwright_instance.stop()
                self._playwright_instance = None
        except Exception:
            pass
        logger.info("FlowCamoufoxEngine: Stopped")

    async def _refresh_session(self):
        """Refresh reCAPTCHA context by navigating back to gallery and re-entering project."""
        logger.info("FlowCamoufoxEngine: Refreshing session (page reload)…")
        await self._page.goto(
            FLOW_GALLERY, wait_until="domcontentloaded", timeout=30000
        )
        await asyncio.sleep(8)
        entered = await self._enter_project()
        if not entered:
            raise RuntimeError("Failed to re-enter project after session refresh")
        prompt_input = self._page.locator('[role="textbox"]')
        if not (await prompt_input.count() > 0 and await prompt_input.first.is_visible(timeout=5000)):
            prompt_input = self._page.locator("#PINHOLE_TEXT_AREA_ELEMENT_ID")
            if not (await prompt_input.count() > 0 and await prompt_input.first.is_visible(timeout=5000)):
                raise RuntimeError("Prompt input not found after session refresh")
        await self._page.evaluate(_FETCH_INTERCEPTOR)
        logger.info("FlowCamoufoxEngine: Session refreshed ✓")

    async def _full_restart(self):
        """Full browser restart for a completely clean reCAPTCHA state."""
        logger.info("FlowCamoufoxEngine: Full browser restart…")
        self._ready = False
        await self.stop()
        ok = await self.start()
        if not ok:
            raise RuntimeError("Failed to restart FlowCamoufoxEngine")
        logger.info("FlowCamoufoxEngine: Full restart complete ✓")

    async def _human_like_behavior(self):
        """Simulate human-like mouse/scroll behavior to maintain reCAPTCHA score."""
        try:
            for _ in range(random.randint(2, 4)):
                x = random.randint(200, 1000)
                y = random.randint(100, 700)
                await self._page.mouse.move(x, y, steps=random.randint(5, 15))
                await asyncio.sleep(random.uniform(0.1, 0.3))
            scroll_y = random.randint(-150, 150)
            await self._page.mouse.wheel(0, scroll_y)
            await asyncio.sleep(random.uniform(0.2, 0.5))
        except Exception:
            pass

    @staticmethod
    def _is_recaptcha_error(error_msg: str) -> bool:
        """Check if an error is reCAPTCHA-related (needs session recovery)."""
        indicators = ["recaptcha", "PERMISSION_DENIED", "captcha"]
        return any(ind.lower() in error_msg.lower() for ind in indicators)

    async def set_reference_image(self, media_id: str, weight: float = 0.85) -> None:
        """Set a reference image (by mediaGenerationId) for the next generation."""
        await self._page.evaluate(
            "(args) => { window.__flowReferenceMediaId = args.mediaId; window.__flowReferenceWeight = args.weight; }",
            {"mediaId": media_id, "weight": weight},
        )
        logger.info(
            "FlowCamoufoxEngine: Reference image set — media_id=%s… weight=%.2f",
            media_id[:20],
            weight,
        )

    async def clear_reference_image(self) -> None:
        """Clear the reference image so the next generation is text-only."""
        await self._page.evaluate("() => { window.__flowReferenceMediaId = null; }")
        logger.info("FlowCamoufoxEngine: Reference image cleared")

    GENERATION_COOLDOWN = int(os.getenv("FLOW_GENERATION_COOLDOWN", "4"))
    BATCH_COOLDOWN = int(os.getenv("FLOW_BATCH_COOLDOWN", "2"))
    MAX_RETRIES = 4
    RETRY_BACKOFF = [5, 10, 20, 30]
    DEFAULT_BATCH_SIZE = 10

    async def generate(
        self,
        prompt: str,
        output_path: Path | str,
        aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT",
        seed: int | None = None,
    ) -> Path:
        """Generate an image using Flow (Nano Banana Pro / GEM_PIX_2)."""
        return await self._generate_impl(prompt, Path(output_path), aspect_ratio, seed)

    async def generate_batch(
        self,
        items: list[tuple[str, Path | str]],
        aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT",
        on_progress: Optional[callable] = None,
    ) -> list[tuple[Path, str | None] | None]:
        """Generate multiple images with PARALLEL submission + parallel response capture.
        
        Submissions are serialized (clipboard can only paste one prompt at a time),
        but response waiting runs in parallel across all submitted prompts.
        
        Returns list of (path, media_id) tuples or None for failed items.
        """
        total = len(items)

        async def _gen_one(idx, prompt, output_path):
            try:
                result = await self._generate_impl_fast(prompt, Path(output_path), aspect_ratio, None)
                if on_progress:
                    on_progress(idx + 1, total)
                return result
            except Exception as e:
                logger.error("FlowCamoufoxEngine: Batch item %d/%d failed: %s", idx + 1, total, e)
                return None

        tasks = [_gen_one(idx, prompt, out) for idx, (prompt, out) in enumerate(items)]
        results = await asyncio.gather(*tasks)
        await asyncio.sleep(self.GENERATION_COOLDOWN)
        return list(results)

    async def _submit_prompt_fifo(self, prompt: str, aspect_ratio: str) -> int:
        """Submit prompt (fire-and-forget) and return submit_id for correlation.
        
        Flow: JS focus → clear → clipboard paste (Ctrl+V) → click submit button.
        Uses clipboard for INSTANT prompt input instead of slow letter-by-letter typing.
        Response is captured by Playwright network handler (_on_response_handler).
        """
        submit_id = self._submit_counter
        self._submit_counter += 1
        
        logger.info("FlowCamoufoxEngine: ┌─ _submit_prompt_fifo(submit_id=%d) ENTRY ─┐", submit_id)
        logger.info("FlowCamoufoxEngine: │ prompt[:%d] = '%s...'", 
                    min(80, len(prompt)), prompt[:80].replace('\n', ' '))
        
        # Set submit ID in JS for response correlation
        await self._page.evaluate(f"window.__flowPendingSubmitId = {submit_id}")
        
        # Ensure fetch interceptor is installed + set aspect ratio
        await self._page.evaluate(_FETCH_INTERCEPTOR)
        await self._page.evaluate(
            "(ar) => { window.__flowAspectRatio = ar; }",
            aspect_ratio,
        )

        # Find textbox element
        prompt_input = self._page.locator('[role="textbox"]')
        if not (await prompt_input.count() > 0 and await prompt_input.first.is_visible(timeout=3000)):
            prompt_input = self._page.locator("#PINHOLE_TEXT_AREA_ELEMENT_ID")
            if not (await prompt_input.count() > 0 and await prompt_input.first.is_visible(timeout=3000)):
                raise RuntimeError("Prompt input not found on Flow page")

        # ── Slate.js text input — ATOMIC CLEAR+INSERT via Slate React internals ──
        # Playwright Firefox sandboxes clipboard (caches after first Ctrl+C).
        # Synthetic events and keyboard.insertText() also don't work with Slate.
        # PROVEN WORKING: Slate editor.insertText() via React fiber tree.
        # Key fix: clear+insert in ONE atomic JS call to avoid race conditions.
        clean_prompt = prompt.replace('\n', ' ').replace('\r', ' ')
        expected_start = clean_prompt[:30].strip().lower()

        # Helper: read editor content (filters out placeholder text)
        async def _read_editor() -> str:
            return await self._page.evaluate("""
                () => {
                    const el = document.querySelector('[data-slate-editor="true"]')
                              || document.querySelector('[role="textbox"]');
                    if (!el) return '';
                    const text = el.textContent || '';
                    if (text.trim() === 'O que você quer criar?' || text.trim() === '') return '';
                    return text.trim();
                }
            """)

        async def _verify_prompt(strategy_name: str) -> bool:
            actual = await _read_editor()
            actual_start = actual[:30].strip().lower()
            ok = actual_start == expected_start and len(actual) > len(clean_prompt) * 0.3
            if ok:
                logger.info("FlowCamoufoxEngine: [submit_id=%d] ✓ %s VERIFIED — editor='%s...' matches expected='%s...'",
                           submit_id, strategy_name, actual[:50], clean_prompt[:50])
            else:
                logger.warning("FlowCamoufoxEngine: [submit_id=%d] ✗ %s MISMATCH — editor='%s...' vs expected='%s...' (len %d vs %d)",
                              submit_id, strategy_name, actual[:50], clean_prompt[:50], len(actual), len(clean_prompt))
            return ok

        # Locate the Slate editor
        slate_el = self._page.locator('[data-slate-editor="true"]')
        if await slate_el.count() == 0:
            slate_el = self._page.locator('[role="textbox"]')

        prompt_verified = False

        # ── PRIMARY: Slate React internals — ATOMIC clear + insert ──
        # Find Slate editor via React fiber, nuke all children, insert fresh text.
        # Everything happens in ONE page.evaluate() call — no race conditions.
        try:
            result = await self._page.evaluate("""
                (text) => {
                    const el = document.querySelector('[data-slate-editor="true"]');
                    if (!el) return { ok: false, reason: 'no slate element' };
                    
                    // Find React fiber
                    const fiberKey = Object.keys(el).find(k => 
                        k.startsWith('__reactFiber$') || k.startsWith('__reactInternalInstance$')
                    );
                    if (!fiberKey) return { ok: false, reason: 'no react fiber' };
                    
                    // Walk up fiber tree to find Slate editor instance
                    let fiber = el[fiberKey];
                    let editor = null;
                    let depth = 0;
                    while (fiber && depth < 50) {
                        const props = fiber.memoizedProps || fiber.pendingProps;
                        if (props && props.editor && typeof props.editor.insertText === 'function') {
                            editor = props.editor;
                            break;
                        }
                        fiber = fiber.return;
                        depth++;
                    }
                    
                    if (!editor) return { ok: false, reason: 'editor not found (depth=' + depth + ')' };
                    
                    // Cache editor globally for faster access on subsequent calls
                    window.__flowSlateEditor = editor;
                    
                    try {
                        // NUCLEAR CLEAR: Remove all nodes except one empty paragraph
                        // This is more reliable than select-all + deleteFragment
                        while (editor.children.length > 1) {
                            editor.apply({
                                type: 'remove_node',
                                path: [editor.children.length - 1],
                                node: editor.children[editor.children.length - 1],
                            });
                        }
                        // Clear the remaining first node's text
                        if (editor.children.length > 0) {
                            const firstNode = editor.children[0];
                            const textNode = firstNode.children ? firstNode.children[0] : firstNode;
                            if (textNode && textNode.text && textNode.text.length > 0) {
                                editor.apply({
                                    type: 'remove_text',
                                    path: firstNode.children ? [0, 0] : [0],
                                    offset: 0,
                                    text: textNode.text,
                                });
                            }
                        }
                        
                        // Set cursor at start
                        const insertPath = editor.children[0]?.children ? [0, 0] : [0];
                        editor.selection = {
                            anchor: { path: insertPath, offset: 0 },
                            focus: { path: insertPath, offset: 0 },
                        };
                        
                        // Insert the new text
                        editor.insertText(text);
                        
                        // Verify
                        const content = el.textContent || '';
                        return { ok: true, length: content.length, first50: content.substring(0, 50) };
                    } catch(e) {
                        return { ok: false, reason: 'operation failed: ' + e.message };
                    }
                }
            """, clean_prompt)
            await asyncio.sleep(0.3)
            
            if result and result.get('ok'):
                prompt_verified = await _verify_prompt("Slate-internals (atomic)")
            else:
                reason = result.get('reason', 'unknown') if result else 'null result'
                logger.warning("FlowCamoufoxEngine: [submit_id=%d] Slate internals failed: %s", submit_id, reason)
        except Exception as e:
            logger.warning("FlowCamoufoxEngine: [submit_id=%d] Slate internals error: %s", submit_id, e)

        # ── FALLBACK: Use cached editor (faster, skips fiber walk) ──
        if not prompt_verified:
            try:
                result = await self._page.evaluate("""
                    (text) => {
                        const editor = window.__flowSlateEditor;
                        if (!editor || typeof editor.insertText !== 'function') {
                            return { ok: false, reason: 'no cached editor' };
                        }
                        const el = document.querySelector('[data-slate-editor="true"]');
                        try {
                            // Nuclear clear via select-all approach
                            while (editor.children.length > 1) {
                                editor.apply({
                                    type: 'remove_node',
                                    path: [editor.children.length - 1],
                                    node: editor.children[editor.children.length - 1],
                                });
                            }
                            if (editor.children.length > 0) {
                                const firstNode = editor.children[0];
                                const textNode = firstNode.children ? firstNode.children[0] : firstNode;
                                if (textNode && textNode.text && textNode.text.length > 0) {
                                    editor.apply({
                                        type: 'remove_text',
                                        path: firstNode.children ? [0, 0] : [0],
                                        offset: 0,
                                        text: textNode.text,
                                    });
                                }
                            }
                            const insertPath = editor.children[0]?.children ? [0, 0] : [0];
                            editor.selection = {
                                anchor: { path: insertPath, offset: 0 },
                                focus: { path: insertPath, offset: 0 },
                            };
                            editor.insertText(text);
                            const content = (el ? el.textContent : '') || '';
                            return { ok: true, length: content.length };
                        } catch(e) {
                            return { ok: false, reason: 'cached editor failed: ' + e.message };
                        }
                    }
                """, clean_prompt)
                await asyncio.sleep(0.3)
                if result and result.get('ok'):
                    prompt_verified = await _verify_prompt("Slate-cached (atomic)")
                else:
                    reason = result.get('reason', 'unknown') if result else 'null result'
                    logger.warning("FlowCamoufoxEngine: [submit_id=%d] Cached editor failed: %s", submit_id, reason)
            except Exception as e:
                logger.warning("FlowCamoufoxEngine: [submit_id=%d] Cached editor error: %s", submit_id, e)

        # ── LAST RESORT: Ctrl+A → Backspace → keyboard.type(delay=2) ──
        if not prompt_verified:
            try:
                await slate_el.first.click(force=True)
                await asyncio.sleep(0.1)
                await self._page.keyboard.press('Control+a')
                await asyncio.sleep(0.05)
                await self._page.keyboard.press('Backspace')
                await asyncio.sleep(0.15)
                t0 = time.monotonic()
                await self._page.keyboard.type(clean_prompt, delay=2)
                elapsed = time.monotonic() - t0
                await asyncio.sleep(0.2)
                logger.info("FlowCamoufoxEngine: [submit_id=%d] keyboard.type(delay=2) done (%d chars in %.1fs)",
                           submit_id, len(clean_prompt), elapsed)
                prompt_verified = await _verify_prompt("keyboard.type (last resort)")
            except Exception as e:
                logger.warning("FlowCamoufoxEngine: [submit_id=%d] keyboard.type error: %s", submit_id, e)

        # ── FINAL SAFETY: Refuse to submit wrong prompt ──
        if not prompt_verified:
            actual_content = await _read_editor()
            raise RuntimeError(
                f"[submit_id={submit_id}] ALL strategies failed. "
                f"Editor has '{actual_content[:60]}...' but expected '{clean_prompt[:60]}...'. "
                f"Refusing to submit wrong prompt."
            )

        # ── Submit: click arrow button via JS (no Playwright click) ──
        clicked = await self._page.evaluate("""
            () => {
                const btns = document.querySelectorAll('button');
                for (const btn of btns) {
                    if (!btn.offsetParent) continue;
                    const text = btn.textContent.trim();
                    if (text.includes('arrow_forward')) {
                        btn.dispatchEvent(new PointerEvent('pointerdown', {bubbles: true}));
                        btn.dispatchEvent(new PointerEvent('pointerup', {bubbles: true}));
                        btn.dispatchEvent(new MouseEvent('click', {bubbles: true}));
                        return true;
                    }
                }
                return false;
            }
        """)
        if not clicked:
            logger.warning("FlowCamoufoxEngine: Submit button not found, pressing Enter")
            await self._page.keyboard.press('Enter')

        await asyncio.sleep(0.3)

        logger.info("FlowCamoufoxEngine: Prompt submitted! (submit_id=%d, prompt='%s...', active=%d)", 
                   submit_id, prompt[:60].replace('\n', ' '), self._active_slots)
        
        return submit_id

    async def _poll_js_response(self, submit_id: int, timeout: float = 120.0) -> dict:
        """Poll JS __flowResponseQueue for a response matching this submit_id.
        
        Unlike the Future-based system, this reads directly from the browser's JS context
        via page.evaluate(). The JS fetch interceptor captures responses synchronously
        in-browser, avoiding Playwright IPC buffering issues that can cause hangs.
        
        PARALLEL-SAFE: Only matches by exact submitId — never steals another task's response.
        Multiple polls for different submit_ids can run concurrently.
        """
        import time as _time
        deadline = _time.monotonic() + timeout
        poll_count = 0
        
        while _time.monotonic() < deadline:
            poll_count += 1
            
            # Check __flowResponseQueue for EXACT submitId match only
            try:
                result = await self._page.evaluate("""
                    (sid) => {
                        const q = window.__flowResponseQueue || [];
                        for (let i = 0; i < q.length; i++) {
                            if (q[i].submitId === sid) {
                                return q.splice(i, 1)[0].data;
                            }
                        }
                        return null;
                    }
                """, submit_id)
                if result:
                    logger.info("FlowCamoufoxEngine: [PARALLEL] Response captured (poll #%d, submit_id=%d, keys=%s)",
                               poll_count, submit_id, list(result.keys()))
                    return result
            except Exception as e:
                if poll_count <= 2:
                    logger.warning("FlowCamoufoxEngine: [PARALLEL] JS queue poll error (submit_id=%d): %s", submit_id, e)
            
            if poll_count % 15 == 0:
                logger.info("FlowCamoufoxEngine: [PARALLEL] Waiting for submit_id=%d... (poll #%d, elapsed=%.0fs)",
                           submit_id, poll_count, _time.monotonic() - (deadline - timeout))
            
            await asyncio.sleep(1.0)
        
        raise RuntimeError(
            f"Timeout ({timeout}s) waiting for response (submit_id={submit_id}). "
            f"JS interceptor did not capture batchGenerateImages response."
        )

    async def generate_sequential(
        self,
        prompt: str,
        output_path: Path | str,
        aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT",
        seed: int | None = None,
        reference_media_id: str | None = None,
        reference_weight: float = 0.85,
    ) -> tuple[Path, str | None]:
        """Generate ONE image with PARALLEL architecture:
        
        Phase 1 (SERIALIZED via _sequential_lock, ~3-5s):
            Set reference → paste prompt → click submit → release lock
        Phase 2 (PARALLEL, ~30-60s):
            Poll __flowResponseQueue for THIS submit_id → save image
        
        Multiple calls run concurrently: submits are queued, but all polls
        run in parallel. This lets Flow Labs process multiple images simultaneously.
        
        The JS fetch interceptor captures submitId in a closure at REQUEST time,
        so each response is correctly tagged even with parallel submissions.
        
        Returns (saved_path, media_id).
        """
        if not self._ready:
            raise RuntimeError("FlowCamoufoxEngine not started.")
        
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Enable sequential mode — prevents _on_response_handler from calling
        # response.json() which blocks Playwright IPC and deadlocks page.evaluate()
        self._sequential_mode = True
        
        logger.info("FlowCamoufoxEngine: [PARALLEL] Queued → %s (prompt='%s...')",
                    output_path.name, prompt[:50].replace('\n', ' '))
        
        last_error = None
        for attempt in range(self.MAX_RETRIES + 1):
            if attempt > 0:
                backoff = self.RETRY_BACKOFF[min(attempt - 1, len(self.RETRY_BACKOFF) - 1)]
                if last_error and self._is_recaptcha_error(last_error):
                    async with self._sequential_lock:
                        try:
                            if attempt <= 2:
                                await self._refresh_session()
                            else:
                                await self._full_restart()
                        except Exception as recovery_err:
                            raise RuntimeError(f"Session recovery failed: {recovery_err}") from recovery_err
                await asyncio.sleep(backoff)
            
            try:
                # ── Phase 1: SUBMIT (SERIALIZED — one paste+click at a time) ──
                async with self._sequential_lock:
                    # Set reference image if provided
                    if reference_media_id:
                        await self.set_reference_image(reference_media_id, reference_weight)
                    
                    # Submit prompt (Slate insert + click)
                    submit_id = await self._submit_prompt_fifo(prompt, aspect_ratio)
                    logger.info("FlowCamoufoxEngine: [PARALLEL] Phase 1 done — submit_id=%d released lock → %s",
                               submit_id, output_path.name)
                
                # ── Phase 2: POLL (PARALLEL — multiple polls run simultaneously) ──
                response = await self._poll_js_response(submit_id, timeout=120.0)
                
                logger.info("FlowCamoufoxEngine: [PARALLEL] Response for submit_id=%d, keys=%s",
                           submit_id, list(response.keys()))
                
                if "error" in response:
                    last_error = self._extract_error(response)
                    logger.error("FlowCamoufoxEngine: [PARALLEL] API error (submit_id=%d): %s", submit_id, last_error)
                    if attempt < self.MAX_RETRIES:
                        continue
                    raise RuntimeError(f"Flow API error after {self.MAX_RETRIES} retries: {last_error}")
                
                # Extract and save image
                saved_path, media_id = await self._extract_and_save_image(response, output_path)
                if media_id:
                    self._last_media_id = media_id
                
                logger.info("FlowCamoufoxEngine: [PARALLEL] ✓ Image saved → %s (submit_id=%d, media_id=%s)",
                           saved_path.name, submit_id, media_id[:20] if media_id else "none")
                
                return saved_path, media_id
                
            except Exception as e:
                last_error = str(e)
                logger.error("FlowCamoufoxEngine: [PARALLEL] Attempt %d/%d failed (submit_id=%s): %s",
                            attempt + 1, self.MAX_RETRIES + 1, 
                            submit_id if 'submit_id' in dir() else '?', e)
                if attempt < self.MAX_RETRIES:
                    continue
                raise
        
        raise RuntimeError("Unreachable")

    async def _poll_js_generation(self, timeout: float = 90.0) -> dict:
        """Poll window.__flowLastGeneration for API response.
        
        This is the SAME proven approach used by flow_browser_engine.py.
        The JS fetch interceptor captures the batchGenerateImages response
        and stores it in window.__flowLastGeneration.response.
        """
        import time as _time
        deadline = _time.monotonic() + timeout
        poll_count = 0
        
        while _time.monotonic() < deadline:
            poll_count += 1
            try:
                gen = await self._page.evaluate("() => window.__flowLastGeneration")
                if gen and gen.get("response"):
                    logger.info("FlowCamoufoxEngine: Response captured via JS poll (poll #%d, keys=%s)",
                               poll_count, list(gen["response"].keys()))
                    return gen["response"]
                
                if poll_count % 10 == 0:
                    logger.info("FlowCamoufoxEngine: Polling for response... (poll #%d, gen=%s)",
                               poll_count, 'null' if gen is None else list(gen.keys()) if isinstance(gen, dict) else type(gen).__name__)
            except Exception as e:
                logger.warning("FlowCamoufoxEngine: JS poll error: %s", e)
            
            await asyncio.sleep(1.0)
        
        raise RuntimeError(f"Timeout waiting for Flow generation response ({timeout}s). JS interceptor did not capture batchGenerateImages.")

    async def _wait_for_response_by_id(self, submit_id: int, timeout: float = 90.0) -> dict:
        """Wait for response matching this submit_id — checks BOTH JS queue and Python queue.
        
        Layer A (JS queue):  __flowResponseQueue populated by fetch interceptor (has submitId)
        Layer B (Python queue): self._response_queue populated by _on_response_handler (FIFO)
        
        Runs WITHOUT any lock — multiple waits can execute in parallel.
        """
        import time as _time
        deadline = _time.monotonic() + timeout
        poll_count = 0
        
        while _time.monotonic() < deadline:
            poll_count += 1
            
            # ── Layer A: JS __flowResponseQueue (has submitId correlation) ──
            try:
                result = await self._page.evaluate("""
                    (sid) => {
                        const q = window.__flowResponseQueue || [];
                        for (let i = 0; i < q.length; i++) {
                            if (q[i].submitId === sid) {
                                const item = q.splice(i, 1)[0];
                                return item.data;
                            }
                        }
                        return null;
                    }
                """, submit_id)
                if result:
                    logger.info("FlowCamoufoxEngine: [QUEUE-JS] Response for submit_id=%d captured (poll #%d, keys=%s)",
                               submit_id, poll_count, list(result.keys()))
                    return result
            except Exception as e:
                if poll_count <= 2:
                    logger.warning("FlowCamoufoxEngine: [QUEUE-JS] Poll error for submit_id=%d: %s", submit_id, e)
            
            # ── Layer B: Python _response_queue (FIFO fallback from Playwright network) ──
            if self._response_queue:
                data = self._response_queue.pop(0)
                logger.info("FlowCamoufoxEngine: [QUEUE-PY] Response claimed from Python queue for submit_id=%d (keys=%s, remaining=%d)",
                           submit_id, list(data.keys()), len(self._response_queue))
                return data
            
            if poll_count % 15 == 0:
                try:
                    queue_status = await self._page.evaluate("""
                        () => {
                            const q = window.__flowResponseQueue || [];
                            return {len: q.length, ids: q.map(x => x.submitId)};
                        }
                    """)
                except Exception:
                    queue_status = "unavailable"
                logger.info("FlowCamoufoxEngine: [QUEUE] Waiting for submit_id=%d... (poll #%d, js_queue=%s, py_queue=%d)",
                           submit_id, poll_count, queue_status, len(self._response_queue))
            
            await asyncio.sleep(1.0)
        
        raise RuntimeError(
            f"Timeout waiting for response (submit_id={submit_id}, timeout={timeout}s). "
            f"Neither JS interceptor queue nor Playwright network handler captured the response."
        )

    async def _submit_and_wait(self, prompt: str, aspect_ratio: str) -> dict:
        """Submit prompt and wait for response — PARALLEL-SAFE, EVENT-DRIVEN.
        
        Architecture:
          Phase 1 (SERIALIZED): Lock → register Future → paste → click → unlock (~2-5s)
          Phase 2 (PARALLEL):   Await Future (resolved by _on_response_handler) (~30-60s)
        
        No polling! The Playwright response handler resolves Futures in FIFO order.
        Multiple tasks wait in parallel with ZERO CPU usage (just awaiting Futures).
        """
        logger.info("FlowCamoufoxEngine: _submit_and_wait called, prompt=%s...", 
                    prompt[:80].replace('\n', ' '))
        
        if self._slot_semaphore is None:
            self._slot_semaphore = asyncio.Semaphore(self._max_slots)
        
        await self._slot_semaphore.acquire()
        self._active_slots += 1
        logger.info("FlowCamoufoxEngine: Acquired slot (active=%d/%d)", 
                   self._active_slots, self._max_slots)
        
        try:
            # Create a Future that will be resolved by _on_response_handler
            loop = asyncio.get_event_loop()
            response_future = loop.create_future()
            
            # ── Phase 1: SUBMIT (lock only during clipboard + click) ──
            async with self._submission_lock:
                # Check if there's a pre-queued response (from _response_queue)
                if self._response_queue:
                    data = self._response_queue.pop(0)
                    logger.info("FlowCamoufoxEngine: Found pre-queued response, using it directly")
                    response_future.set_result(data)
                else:
                    # Register Future BEFORE submitting (ensures handler finds it)
                    self._pending_responses.append(response_future)
                
                submit_id = await self._submit_prompt_fifo(prompt, aspect_ratio)
                logger.info("FlowCamoufoxEngine: Prompt submitted (submit_id=%d), pending_futures=%d",
                           submit_id, len(self._pending_responses))
                await asyncio.sleep(1.0)  # ensure fetch dispatches before next submit
            
            # ── Phase 2: WAIT (no lock, no polling — just await the Future) ──
            try:
                response = await asyncio.wait_for(response_future, timeout=120.0)
            except asyncio.TimeoutError:
                # Remove our future from pending list if still there
                if response_future in self._pending_responses:
                    self._pending_responses.remove(response_future)
                raise RuntimeError(
                    f"Timeout (120s) waiting for response (submit_id={submit_id}). "
                    f"pending_futures={len(self._pending_responses)}"
                )
            
            return response
        finally:
            self._active_slots -= 1
            self._slot_semaphore.release()
            logger.info("FlowCamoufoxEngine: Released slot (active=%d/%d)", 
                       self._active_slots, self._max_slots)

    @staticmethod
    def _extract_error(response: dict) -> str:
        """Extract human-readable error from API error response."""
        err = response.get("error", {})
        if isinstance(err, dict):
            code = err.get("code", "")
            msg = err.get("message", "")
            status = err.get("status", "")
            return f"[{code}] {status}: {msg}" if code else str(err)
        return str(err)

    async def _extract_and_save_image(self, response: dict, output_path: Path) -> tuple[Path, str | None]:
        """Extract image from API response and save to disk.
        
        Handles two response formats:
        1. imagePanels format: { imagePanels: [{ generatedImages: [{ encodedImage: "base64..." }] }] }
        2. media format: { media: [{ image: { generatedImage: { fifeUrl: "..." } } }] }
        
        Returns (output_path, media_id) or raises RuntimeError.
        """
        import base64 as _b64

        # ── Format 1: imagePanels (base64-encoded image) ──
        panels = response.get("imagePanels", [])
        if panels:
            images = panels[0].get("generatedImages", [])
            if images:
                img = images[0]
                media_id = img.get("mediaGenerationId")
                encoded = img.get("encodedImage")
                if encoded:
                    image_bytes = _b64.b64decode(encoded)
                    output_path.write_bytes(image_bytes)
                    logger.info("FlowCamoufoxEngine: Image saved (base64/imagePanels) → %s (%d bytes)",
                               output_path.name, len(image_bytes))
                    return output_path, media_id

        # ── Format 2: media with fifeUrl ──
        media_list = response.get("media", [])
        if media_list:
            first_media = media_list[0]
            image_data = first_media.get("image", {}).get("generatedImage", {})
            fife_url = image_data.get("fifeUrl", "")
            media_id = image_data.get("mediaGenerationId")

            if fife_url:
                # Try download through browser first (has auth cookies)
                try:
                    image_b64 = await self._page.evaluate("""
                        async (url) => {
                            try {
                                const resp = await fetch(url);
                                if (!resp.ok) return null;
                                const blob = await resp.blob();
                                return new Promise((resolve, reject) => {
                                    const reader = new FileReader();
                                    reader.onloadend = () => resolve(reader.result.split(',')[1]);
                                    reader.onerror = reject;
                                    reader.readAsDataURL(blob);
                                });
                            } catch (e) {
                                return null;
                            }
                        }
                    """, fife_url)
                    if image_b64:
                        image_bytes = _b64.b64decode(image_b64)
                        output_path.write_bytes(image_bytes)
                        logger.info("FlowCamoufoxEngine: Image saved (browser fetch) → %s (%d bytes)",
                                   output_path.name, len(image_bytes))
                        return output_path, media_id
                except Exception as e:
                    logger.warning("FlowCamoufoxEngine: Browser download failed: %s, trying HTTP client", e)

                # Fallback: plain HTTP client
                dl_resp = await self._http_client.get(fife_url)
                if dl_resp.status_code == 200:
                    output_path.write_bytes(dl_resp.content)
                    logger.info("FlowCamoufoxEngine: Image saved (HTTP client) → %s (%d bytes)",
                               output_path.name, len(dl_resp.content))
                    return output_path, media_id
                else:
                    raise RuntimeError(f"Image download failed: HTTP {dl_resp.status_code} from {fife_url[:80]}")

        # ── Neither format found ──
        raise RuntimeError(
            f"No image data in response. Top-level keys: {list(response.keys())}. "
            f"imagePanels={len(panels)}, media={len(media_list)}"
        )

    async def _generate_impl_fast(
        self,
        prompt: str,
        output_path: Path,
        aspect_ratio: str,
        seed: int | None,
    ) -> tuple[Path, str | None]:
        """Fast generation without full cooldown (for batch mode).
        
        Returns (saved_path, media_id) — media_id is needed for coverage shot references.
        """
        if not self._ready:
            raise RuntimeError("FlowCamoufoxEngine not started.")

        output_path.parent.mkdir(parents=True, exist_ok=True)
        logger.info(
            "FlowCamoufoxEngine: [BATCH] Generating → %s",
            output_path.name,
        )

        last_error = None
        for attempt in range(self.MAX_RETRIES + 1):
            if attempt > 0:
                backoff = self.RETRY_BACKOFF[
                    min(attempt - 1, len(self.RETRY_BACKOFF) - 1)
                ]
                if last_error and self._is_recaptcha_error(last_error):
                    try:
                        if attempt <= 2:
                            await self._refresh_session()
                        else:
                            await self._full_restart()
                    except Exception as recovery_err:
                        raise RuntimeError(
                            f"Session recovery failed: {recovery_err}"
                        ) from recovery_err
                await asyncio.sleep(backoff)

            response = await self._submit_and_wait(prompt, aspect_ratio)
            logger.info("FlowCamoufoxEngine: [BATCH] Response keys: %s", list(response.keys()))

            if "error" in response:
                last_error = self._extract_error(response)
                logger.error("FlowCamoufoxEngine: API error — %s", last_error)
                if attempt < self.MAX_RETRIES:
                    continue
                raise RuntimeError(
                    f"Flow API error after {self.MAX_RETRIES} retries: {last_error}"
                )

            # Extract and save image (handles both imagePanels and media formats)
            try:
                saved_path, media_id = await self._extract_and_save_image(response, output_path)
                if media_id:
                    self._last_media_id = media_id
                return saved_path, media_id
            except RuntimeError as img_err:
                last_error = str(img_err)
                logger.error("FlowCamoufoxEngine: [BATCH] Image extraction failed: %s", last_error)
                if attempt < self.MAX_RETRIES:
                    continue
                raise

        raise RuntimeError("Unreachable")

    async def _generate_impl(
        self,
        prompt: str,
        output_path: Path,
        aspect_ratio: str,
        seed: int | None,
    ) -> Path:
        if not self._ready:
            raise RuntimeError("FlowCamoufoxEngine not started.")

        output_path.parent.mkdir(parents=True, exist_ok=True)
        logger.info(
            "FlowCamoufoxEngine: Generating → %s (aspect=%s)",
            output_path.name,
            aspect_ratio,
        )

        last_error = None
        for attempt in range(self.MAX_RETRIES + 1):
            if attempt > 0:
                backoff = self.RETRY_BACKOFF[
                    min(attempt - 1, len(self.RETRY_BACKOFF) - 1)
                ]

                if last_error and self._is_recaptcha_error(last_error):
                    try:
                        if attempt <= 2:
                            logger.warning(
                                "FlowCamoufoxEngine: reCAPTCHA recovery L1 — page refresh (attempt %d/%d)",
                                attempt,
                                self.MAX_RETRIES,
                            )
                            await self._refresh_session()
                        else:
                            logger.warning(
                                "FlowCamoufoxEngine: reCAPTCHA recovery L2 — full restart (attempt %d/%d)",
                                attempt,
                                self.MAX_RETRIES,
                            )
                            await self._full_restart()
                    except Exception as recovery_err:
                        logger.error(
                            "FlowCamoufoxEngine: Recovery failed — %s", recovery_err
                        )
                        raise RuntimeError(
                            f"Session recovery failed: {recovery_err}"
                        ) from recovery_err
                else:
                    logger.warning(
                        "FlowCamoufoxEngine: Retry %d/%d after %ds backoff (last error: %s)",
                        attempt,
                        self.MAX_RETRIES,
                        backoff,
                        last_error,
                    )

                await asyncio.sleep(backoff)

            response = await self._submit_and_wait(prompt, aspect_ratio)
            logger.info("FlowCamoufoxEngine: Response keys: %s", list(response.keys()))

            if "error" in response:
                last_error = self._extract_error(response)
                logger.error("FlowCamoufoxEngine: API error — %s", last_error)
                if attempt < self.MAX_RETRIES:
                    continue
                raise RuntimeError(
                    f"Flow API error after {self.MAX_RETRIES} retries: {last_error}"
                )

            # Extract and save image (handles both imagePanels and media formats)
            try:
                saved_path, media_id = await self._extract_and_save_image(response, output_path)
                if media_id:
                    self._last_media_id = media_id
                    logger.debug("FlowCamoufoxEngine: Captured media_id=%s…", media_id[:20])
            except RuntimeError as img_err:
                last_error = str(img_err)
                logger.error("FlowCamoufoxEngine: Image extraction failed: %s", last_error)
                if attempt < self.MAX_RETRIES:
                    continue
                raise

            await self._human_like_behavior()
            await asyncio.sleep(self.GENERATION_COOLDOWN)
            return saved_path

        raise RuntimeError("Unreachable")


# ─── Global singleton ─────────────────────────────────────────────────────────

_engine: Optional[FlowCamoufoxEngine] = None
_engine_lock = asyncio.Lock()


async def get_camoufox_engine(
    session_token: str | None = None, csrf_token: str | None = None
) -> FlowCamoufoxEngine:
    """Get or create the global FlowCamoufoxEngine singleton."""
    global _engine
    
    # Quick check without lock first
    if _engine and _engine.is_running:
        return _engine
    
    async with _engine_lock:
        # Double-check after acquiring lock
        if _engine and _engine.is_running:
            return _engine
        
        # Engine exists but browser was closed - clean up
        if _engine:
            logger.warning("FlowCamoufoxEngine: Browser was closed, restarting...")
            try:
                await _engine.stop()
            except Exception:
                pass
            _engine = None
        
        # Create new engine
        logger.info("FlowCamoufoxEngine: Creating new engine instance...")
        
        # Try to get cookies from GOOGLE_FLOW_COOKIES JSON first
        # Priority: individual env vars (web UI updates these) > GOOGLE_FLOW_COOKIES JSON
        session = session_token or os.getenv("GOOGLE_FLOW_SESSION_TOKEN", "")
        csrf = csrf_token or os.getenv("GOOGLE_FLOW_CSRF_TOKEN", "")
        
        # Parse full cookie list from JSON (for injecting ALL cookies into browser)
        full_cookies: list[dict[str, str]] | None = None
        cookies_json = os.getenv("GOOGLE_FLOW_COOKIES", "")
        if cookies_json:
            try:
                parsed_cookies = json.loads(cookies_json)
                full_cookies = parsed_cookies
                # Extract session/csrf from JSON ONLY if not already set from env vars
                if not session:
                    for cookie in parsed_cookies:
                        if cookie.get("name") == "__Secure-next-auth.session-token":
                            session = cookie.get("value", "")
                            break
                if not csrf:
                    for cookie in parsed_cookies:
                        if cookie.get("name") == "__Host-next-auth.csrf-token":
                            csrf = cookie.get("value", "")
                            break
                logger.info("FlowCamoufoxEngine: Parsed %d cookies from GOOGLE_FLOW_COOKIES JSON", len(parsed_cookies))
            except Exception as e:
                logger.warning("FlowCamoufoxEngine: Failed to parse GOOGLE_FLOW_COOKIES: %s", e)
        
        if not session or not csrf:
            raise RuntimeError("Google Flow cookies not configured.")
        _engine = FlowCamoufoxEngine(session, csrf, cookies=full_cookies)
        logger.info("FlowCamoufoxEngine: Starting engine...")
        if not await _engine.start():
            raise RuntimeError("Failed to start FlowCamoufoxEngine")
        logger.info("FlowCamoufoxEngine: Engine started successfully!")
        return _engine


async def stop_camoufox_engine():
    """Stop the global engine and worker."""
    global _engine
    if _engine:
        await _engine.stop()
        _engine = None
    await _generation_service.stop()


# ─── Generation Service (persistent worker queue) ────────────────────────────


@dataclass
class _GenerationJob:
    """A single image generation request queued for the worker."""
    prompt: str
    output_path: Path
    aspect_ratio: str
    seed: int | None
    reference_media_id: str | None
    reference_weight: float
    future: asyncio.Future = field(default_factory=lambda: None)
    job_id: int = 0


class GenerationService:
    """Persistent service that dispatches image generation jobs in PARALLEL.
    
    Architecture:
      - Jobs arrive via enqueue() → dispatched immediately as asyncio tasks
      - Engine's _submission_lock serializes only the paste (~2-5s each)
      - Engine's _slot_semaphore limits total in-flight to 10
      - Response waits run fully in parallel (30-60s each)
      - Dispatcher loop ensures engine is alive before each dispatch
      - If engine crashes, dispatcher restarts it for next job
      - Dispatcher NEVER dies — catches all exceptions and continues
    
    Flow for 6 concurrent requests:
      [enqueue] → dispatcher picks job → create_task(generate) → IMMEDIATELY picks next
      [enqueue] → dispatcher picks job → create_task(generate) → IMMEDIATELY picks next
      ...all 6 dispatched in ~12-30s (paste serialized) then all 6 wait in parallel
    """
    
    MAX_CONCURRENT = 10  # matches engine._max_slots
    
    def __init__(self):
        self._queue: asyncio.Queue[_GenerationJob] = asyncio.Queue()
        self._dispatcher_task: asyncio.Task | None = None
        self._job_counter: int = 0
        self._active_jobs: dict[int, asyncio.Task] = {}  # job_id → task
        self._started = False
    
    def _ensure_started(self):
        """Start the dispatcher loop if not already running."""
        if self._dispatcher_task is None or self._dispatcher_task.done():
            self._dispatcher_task = asyncio.get_event_loop().create_task(self._dispatcher_loop())
            self._started = True
            logger.info("GenerationService: Dispatcher started")
    
    @property
    def queue_size(self) -> int:
        return self._queue.qsize()
    
    @property
    def active_count(self) -> int:
        return len(self._active_jobs)
    
    async def enqueue(
        self,
        prompt: str,
        output_path: Path | str,
        aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT",
        seed: int | None = None,
        reference_media_id: str | None = None,
        reference_weight: float = 0.85,
    ) -> Path:
        """Enqueue a generation job and wait for the result.
        
        The job is dispatched in parallel with other jobs.
        The caller awaits until THIS specific job completes.
        """
        self._ensure_started()
        
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        
        self._job_counter += 1
        job = _GenerationJob(
            prompt=prompt,
            output_path=Path(output_path),
            aspect_ratio=aspect_ratio,
            seed=seed,
            reference_media_id=reference_media_id,
            reference_weight=reference_weight,
            future=future,
            job_id=self._job_counter,
        )
        
        await self._queue.put(job)
        logger.info("GenerationService: Job #%d enqueued (queued=%d, active=%d, prompt=%.60s...)",
                    job.job_id, self._queue.qsize(), self.active_count,
                    prompt[:60].replace('\n', ' '))
        
        return await future
    
    async def _run_job(self, job: _GenerationJob):
        """Execute a single generation job. Runs as an independent asyncio task."""
        try:
            logger.info("GenerationService: ┌─ Job #%d STARTED (active=%d) ─┐",
                       job.job_id, self.active_count)
            
            # Get or restart engine (handles crashes automatically)
            engine = await get_camoufox_engine()
            
            if job.reference_media_id:
                await engine.set_reference_image(job.reference_media_id, job.reference_weight)
            
            try:
                # engine.generate() → _submit_and_wait():
                #   Phase 1 (SERIALIZED via _submission_lock): paste prompt (~2-5s)
                #   Phase 2 (PARALLEL): poll for response (~30-60s)
                result = await engine.generate(
                    job.prompt, job.output_path, job.aspect_ratio, job.seed
                )
                
                if not job.future.done():
                    job.future.set_result(result)
                logger.info("GenerationService: └─ Job #%d DONE → %s (active=%d) ─┘",
                           job.job_id, result, self.active_count - 1)
            finally:
                if job.reference_media_id:
                    try:
                        await engine.clear_reference_image()
                    except Exception:
                        pass
                        
        except Exception as exc:
            logger.error("GenerationService: └─ Job #%d FAILED: %s (active=%d) ─┘",
                        job.job_id, exc, self.active_count - 1)
            if not job.future.done():
                job.future.set_exception(exc)
        finally:
            self._active_jobs.pop(job.job_id, None)
    
    async def _dispatcher_loop(self):
        """Dispatcher: dequeues jobs and fires them as parallel tasks. Never dies."""
        logger.info("GenerationService: ═══ Dispatcher loop running ═══")
        while True:
            try:
                job = await self._queue.get()
                
                logger.info("GenerationService: Dispatching job #%d (queued=%d, active=%d)",
                           job.job_id, self._queue.qsize(), self.active_count)
                
                # Fire job as independent parallel task
                task = asyncio.get_event_loop().create_task(self._run_job(job))
                self._active_jobs[job.job_id] = task
                
                self._queue.task_done()
                
            except asyncio.CancelledError:
                logger.info("GenerationService: Dispatcher cancelled")
                break
            except Exception as exc:
                logger.error("GenerationService: Dispatcher error: %s", exc)
                await asyncio.sleep(1.0)
    
    async def stop(self):
        """Stop the dispatcher and cancel all active/pending jobs."""
        if self._dispatcher_task and not self._dispatcher_task.done():
            self._dispatcher_task.cancel()
            try:
                await self._dispatcher_task
            except asyncio.CancelledError:
                pass
        
        # Cancel active job tasks
        for job_id, task in list(self._active_jobs.items()):
            if not task.done():
                task.cancel()
        self._active_jobs.clear()
        
        # Fail queued jobs
        while not self._queue.empty():
            try:
                job = self._queue.get_nowait()
                if not job.future.done():
                    job.future.set_exception(RuntimeError("GenerationService stopped"))
            except asyncio.QueueEmpty:
                break
        
        logger.info("GenerationService: Stopped")


# Global service instance
_generation_service = GenerationService()


async def generate_image_camoufox(
    prompt: str,
    output_path: Path | str,
    aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT",
    seed: int | None = None,
    reference_media_id: str | None = None,
    reference_weight: float = 0.85,
) -> Path:
    """High-level: generate an image via the persistent worker queue.
    
    Jobs are queued and processed sequentially by the worker.
    The caller awaits until their specific job completes.
    Multiple callers can enqueue concurrently — they'll be processed in order.
    """
    return await _generation_service.enqueue(
        prompt=prompt,
        output_path=output_path,
        aspect_ratio=aspect_ratio,
        seed=seed,
        reference_media_id=reference_media_id,
        reference_weight=reference_weight,
    )


async def generate_image_sequential(
    prompt: str,
    output_path: Path | str,
    aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT",
    seed: int | None = None,
    reference_media_id: str | None = None,
    reference_weight: float = 0.85,
) -> tuple[Path, str | None]:
    """Generate ONE image sequentially via the Camoufox engine.
    
    Bypasses the GenerationService queue — calls the engine directly.
    Uses JS fetch interceptor for response capture (no Playwright IPC hangs).
    Designed for frame-by-frame generation with delays between calls.
    
    Returns (saved_path, media_id).
    """
    engine = await get_camoufox_engine()
    return await engine.generate_sequential(
        prompt=prompt,
        output_path=output_path,
        aspect_ratio=aspect_ratio,
        seed=seed,
        reference_media_id=reference_media_id,
        reference_weight=reference_weight,
    )


async def generate_images_batch_camoufox(
    items: list[tuple[str, Path | str]],
    aspect_ratio: str = "IMAGE_ASPECT_RATIO_PORTRAIT",
    batch_size: int = 10,
    on_progress: Optional[callable] = None,
    on_batch_complete: Optional[callable] = None,
) -> list[Path | None]:
    """High-level: generate multiple images via the worker queue."""
    results: list[Path | None] = []
    total = len(items)
    
    for idx, (prompt, out_path) in enumerate(items):
        try:
            path = await _generation_service.enqueue(
                prompt=prompt,
                output_path=out_path,
                aspect_ratio=aspect_ratio,
            )
            results.append(path)
            if on_progress:
                on_progress(idx + 1, total)
        except Exception as e:
            logger.error("GenerationService: Batch item %d/%d failed: %s", idx + 1, total, e)
            results.append(None)
    
    if on_batch_complete:
        on_batch_complete(1, 1)
    
    return results


# Aliases for compatibility with flow_browser_engine API
get_engine = get_camoufox_engine
generate_image = generate_image_camoufox
generate_images_batch = generate_images_batch_camoufox
stop_engine = stop_camoufox_engine


if __name__ == "__main__":
    import sys
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

    async def test():
        print("=" * 60)
        print("Flow Camoufox Engine Test (Anti-Detect Browser)")
        print("=" * 60)

        session = os.getenv("GOOGLE_FLOW_SESSION_TOKEN", "")
        csrf = os.getenv("GOOGLE_FLOW_CSRF_TOKEN", "")

        if not session or not csrf:
            print("ERROR: GOOGLE_FLOW_SESSION_TOKEN and GOOGLE_FLOW_CSRF_TOKEN must be set")
            sys.exit(1)

        engine = FlowCamoufoxEngine(session, csrf)
        try:
            print("\n1. Starting Camoufox engine...")
            if not await engine.start():
                print("   ✗ Failed to start engine")
                return

            print("   ✓ Engine started")

            print("\n2. Testing image generation...")
            try:
                path = await engine.generate(
                    prompt="a majestic dragon flying over a medieval castle at sunset, photorealistic",
                    output_path="./test_output/flow_camoufox_test.png",
                    aspect_ratio="IMAGE_ASPECT_RATIO_PORTRAIT",
                )
                print(f"   ✓ Generated image: {path}")
            except Exception as e:
                print(f"   ✗ Error: {e}")
                import traceback
                traceback.print_exc()

        finally:
            await engine.stop()

        print("\n" + "=" * 60)
        print("Test complete")
        print("=" * 60)

    asyncio.run(test())
