"""Async web scraper with rate limiting and concurrent page support.
The ``AsyncScraper`` class is the main entry point for IntelliScraper.
It orchestrates browser backends, rate limiting, and page management
to scrape one or many URLs concurrently.
Two browser modes are supported:
- **Local browser** (``use_local_browser=True``): Connects to your
running Chrome instance via CDP. All existing cookies and logins
are available immediately.
- **Managed browser** (default): Launches a fresh Chromium instance
with fingerprint spoofing, proxy, and session data.
Example::
import asyncio
from intelliscraper import AsyncScraper, ScrapStatus
async def main():
async with AsyncScraper(max_concurrent_pages=4) as scraper:
response = await scraper.scrape("https://example.com")
if response.status == ScrapStatus.SUCCESS:
print(response.scrap_html_content)
asyncio.run(main())
"""
from __future__ import annotations
import asyncio
import copy
import logging
import random
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING
from playwright.async_api import Page
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from playwright.async_api import async_playwright
from intelliscraper.browser.local import LocalBrowserBackend
from intelliscraper.browser.managed import ManagedBrowserBackend
from intelliscraper.common.constants import (
BROWSER_LAUNCH_OPTIONS,
MAX_PAUSE_MS,
MAX_SCROLL_WAIT_MS,
MIN_PAUSE_MS,
MIN_SCROLL_WAIT_MS,
)
from intelliscraper.common.models import (
Proxy,
RequestEvent,
ScrapeRequest,
ScrapeResponse,
Session,
)
from intelliscraper.enums import BrowsingMode, ScrapStatus
from intelliscraper.proxy.base import ProxyProvider
from intelliscraper.rate_limiter import RateLimiter
if TYPE_CHECKING:
from intelliscraper.browser.backend import BrowserBackend
logger = logging.getLogger(__name__)
[docs]
class AsyncScraper:
"""Async web scraper with rate limiting and concurrent page support.
Orchestrates browser backends, page pools, rate limiting, and
human-like browsing behaviour to scrape URLs concurrently.
Args:
headless: Run browser without UI. Only applies in managed
browser mode. Ignored when ``use_local_browser=True``.
Defaults to ``True``.
browser_launch_options: Custom Chromium launch options. Only
applies in managed browser mode. Defaults to
``BROWSER_LAUNCH_OPTIONS``.
proxy: Proxy configuration or ``ProxyProvider`` instance.
Only applies in managed browser mode. Defaults to
``None``.
session_data: Pre-authenticated session with cookies,
localStorage, sessionStorage, and browser fingerprint.
Only applies in managed browser mode. Defaults to
``None``.
browsing_mode: Behaviour mode — ``FAST`` (no human
simulation) or ``HUMAN_LIKE`` (scrolling, delays).
Auto-determined if ``None``. Defaults to ``None``.
max_concurrent_pages: Number of pages to use for concurrent
scraping. Defaults to ``1``.
use_local_browser: If ``True``, connect to an existing Chrome
instance via CDP instead of launching a new browser.
Defaults to ``False``.
max_requests_per_minute: Rate limit shared across all pages.
Set to ``None`` or ``0`` to disable (default).
Note:
``__init__`` only sets configuration. Call ``initialize()``
or use the async context manager to start the browser::
async with AsyncScraper() as scraper:
result = await scraper.scrape(url)
"""
def __init__(
self,
headless: bool = True,
browser_launch_options: dict = BROWSER_LAUNCH_OPTIONS,
proxy: Proxy | ProxyProvider | None = None,
session_data: Session | None = None,
browsing_mode: BrowsingMode | None = None,
max_concurrent_pages: int = 1,
use_local_browser: bool = False,
max_requests_per_minute: int | None = None,
) -> None:
logger.debug("Initializing AsyncScraper")
self.use_local_browser = use_local_browser
self.headless = headless
self.browser_launch_options = copy.deepcopy(browser_launch_options)
self.browser_launch_options.update({"headless": headless})
# Resolve proxy.
if proxy is not None and isinstance(proxy, ProxyProvider):
logger.debug(
"Converting ProxyProvider to Proxy: %s",
proxy.__class__.__name__,
)
self.proxy = proxy.get_proxy()
else:
self.proxy = proxy
self.session_data = session_data
self.max_concurrent_pages = max_concurrent_pages
self._closed = False
self._initialized = False
self._playwright = None
self._browser = None
self._context = None
self._page_pool: list[Page] = []
self._semaphore: asyncio.Semaphore | None = None
# Rate limiter (shared across all pages).
self._rate_limiter = RateLimiter(
max_requests_per_minute=max_requests_per_minute
)
# Determine browsing mode.
self.browsing_mode = self._resolve_browsing_mode(browsing_mode)
# Create the backend.
self._backend: BrowserBackend = self._create_backend()
# Determine browser mode label for responses.
self._browser_mode_label = (
"local_browser" if use_local_browser else "managed_browser"
)
if self.proxy:
logger.info("Using proxy: %s", self.proxy.server)
if session_data:
logger.info("Using session data for authenticated scraping")
logger.info(
"Scraper mode: %s | pages: %d | browsing: %s | " "rate limit: %s rpm",
self._browser_mode_label,
self.max_concurrent_pages,
self.browsing_mode.value,
self._rate_limiter.max_rpm or "unlimited",
)
# ── Initialisation ────────────────────────────────────────────────
[docs]
async def initialize(self) -> None:
"""Initialise browser and create the page pool.
Dispatches to the configured backend to start the browser,
then creates ``max_concurrent_pages`` pages and a semaphore.
"""
if self._initialized:
return
logger.debug("Starting async initialisation")
self._playwright = await async_playwright().start()
self._browser, self._context = await self._backend.initialize(self._playwright)
logger.debug(
"Creating page pool with %d pages",
self.max_concurrent_pages,
)
for i in range(self.max_concurrent_pages):
page = await self._create_configured_page()
self._page_pool.append(page)
logger.debug("Created page %d/%d", i + 1, self.max_concurrent_pages)
self._semaphore = asyncio.Semaphore(self.max_concurrent_pages)
self._initialized = True
logger.info(
"AsyncScraper initialised | mode: %s | pages: %d",
self._browser_mode_label,
self.max_concurrent_pages,
)
# ── Context managers ──────────────────────────────────────────────
async def __aenter__(self) -> AsyncScraper:
"""Async context manager entry — initialises the scraper."""
await self.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
"""Async context manager exit — closes the scraper."""
await self.close()
return False
[docs]
async def close(self) -> None:
"""Close browser and release all resources.
Behaviour depends on the backend:
- **Local browser**: Only closes pages opened by the scraper.
The Chrome process and context are left running.
- **Managed browser**: Closes pages, context, and browser.
"""
if self._closed:
return
self._closed = True
logger.debug("Starting async cleanup ...")
try:
for page in self._page_pool:
try:
await page.close()
except Exception as exc:
logger.debug("Failed to close page: %s", exc)
await self._backend.cleanup(self._browser, self._context)
if self._playwright:
await self._playwright.stop()
logger.debug("Async cleanup complete")
except Exception as exc:
logger.error("Error during async cleanup: %s", exc)
def __del__(self) -> None:
"""Destructor — warns if the scraper was not properly closed."""
if not self._closed and self._initialized:
logger.warning(
"AsyncScraper was not properly closed. "
"Use 'async with AsyncScraper()' or call "
"'await scraper.close()'"
)
# ── Public API ────────────────────────────────────────────────────
[docs]
async def scrape(
self,
url: str,
timeout: timedelta = timedelta(seconds=30),
) -> ScrapeResponse:
"""Scrape content from a single URL.
The semaphore ensures only ``max_concurrent_pages`` requests
run simultaneously. Pages are selected from the pool using
round-robin. Rate limiting is applied before navigation.
Args:
url: Target URL to scrape.
timeout: Maximum time to wait for page load. Defaults
to 30 seconds.
Returns:
A ``ScrapeResponse`` with status, HTML content, HTTP
status code, timing, and metadata.
Example::
async with AsyncScraper(max_concurrent_pages=4) as scraper:
tasks = [
scraper.scrape("https://example1.com"),
scraper.scrape("https://example2.com"),
]
results = await asyncio.gather(*tasks)
"""
sent_at = datetime.now(timezone.utc).timestamp()
async with self._semaphore:
page = self._get_available_page()
try:
scrape_request = ScrapeRequest(
url=url,
timeout=timeout,
browser_launch_options=self.browser_launch_options,
proxy=self.proxy,
session_data=self.session_data,
browsing_mode=self.browsing_mode,
)
if self._closed:
logger.error("Cannot scrape: scraper is closed")
raise RuntimeError(
"Scraper is closed. Create a new instance "
"or use the context manager."
)
if self.session_data and not url.startswith(self.session_data.base_url):
logger.warning(
"URL %s does not match session base URL %s. "
"Scraping may fail due to invalid session.",
url,
self.session_data.base_url,
)
self._validate_url(url)
logger.info("Scraping: %s", url)
# Apply rate limiting before navigation.
await self._rate_limiter.acquire()
logger.debug("Navigating to: %s", url)
response = await page.goto(
url=url,
wait_until="networkidle",
timeout=timeout.total_seconds() * 1000,
)
# Capture HTTP status code from the server response.
http_status_code = response.status if response else None
logger.debug("Page loaded: %s (HTTP %s)", url, http_status_code)
# Check for rate limiting or blocking.
status = self._classify_http_status(http_status_code)
if self.browsing_mode == BrowsingMode.HUMAN_LIKE:
await self._apply_human_like_behavior(page)
html_content = await page.content()
elapsed_time = datetime.now(timezone.utc).timestamp() - sent_at
logger.info(
"Scraping finished: %s in %.2fs " "(HTTP %s, status=%s)",
url,
elapsed_time,
http_status_code,
status.value,
)
self._record_event(sent_at=sent_at, status=status)
return ScrapeResponse(
scrape_request=scrape_request,
status=status,
http_status_code=http_status_code,
elapsed_time=elapsed_time,
scrap_html_content=html_content,
session_id=self._get_session_id(),
browser_mode=self._browser_mode_label,
)
except PlaywrightTimeoutError as exc:
html_content = await page.content()
elapsed_time = datetime.now(timezone.utc).timestamp() - sent_at
# If we captured meaningful HTML before the timeout, it's a partial success
if html_content and len(html_content.strip()) > 100:
status = ScrapStatus.PARTIAL_SUCCESS
else:
status = ScrapStatus.TIMEOUT
self._record_event(sent_at=sent_at, status=status)
logger.warning(
"Timeout scraping %s after %.1fs. "
"Returning partial content (status=%s).",
url,
timeout.total_seconds(),
status.value,
)
return ScrapeResponse(
scrape_request=scrape_request,
status=status,
elapsed_time=elapsed_time,
scrap_html_content=html_content,
error_msg=str(exc),
session_id=self._get_session_id(),
browser_mode=self._browser_mode_label,
)
except Exception as exc:
elapsed_time = datetime.now(timezone.utc).timestamp() - sent_at
logger.error("Failed to scrape %s: %s", url, exc, exc_info=True)
self._record_event(sent_at=sent_at, status=ScrapStatus.FAILED)
return ScrapeResponse(
scrape_request=scrape_request,
status=ScrapStatus.FAILED,
elapsed_time=elapsed_time,
error_msg=str(exc),
session_id=self._get_session_id(),
browser_mode=self._browser_mode_label,
)
[docs]
async def batch_scrape(
self,
urls: list[str],
timeout: timedelta = timedelta(seconds=30),
) -> list[ScrapeResponse]:
"""Scrape multiple URLs with rate limiting and concurrency control.
This is the recommended API for scraping large numbers of URLs.
Rate limiting (via ``max_requests_per_minute``) is applied
before each request, and the page-pool semaphore controls
concurrency.
Args:
urls: List of target URLs to scrape.
timeout: Maximum time per page load. Defaults to
30 seconds.
Returns:
List of ``ScrapeResponse`` objects, one per URL, in the
same order as the input URLs.
Example::
async with AsyncScraper(
max_concurrent_pages=4,
max_requests_per_minute=900, # 15/sec
) as scraper:
results = await scraper.batch_scrape(
urls=[f"https://example.com/page/{i}" for i in range(100)]
)
for result in results:
print(result.scrape_request.url, result.status)
"""
logger.info(
"Starting batch scrape of %d URLs " "(concurrency=%d, rate_limit=%s rpm)",
len(urls),
self.max_concurrent_pages,
self._rate_limiter.max_rpm or "unlimited",
)
tasks = [self.scrape(url=url, timeout=timeout) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=False)
# Summarise results.
status_counts = {}
for result in results:
status_value = result.status.value
status_counts[status_value] = status_counts.get(status_value, 0) + 1
logger.info(
"Batch scrape complete: %d URLs — %s",
len(urls),
", ".join(f"{k}: {v}" for k, v in sorted(status_counts.items())),
)
return list(results)
# ── Private helpers ───────────────────────────────────────────────
def _resolve_browsing_mode(
self,
explicit_mode: BrowsingMode | None,
) -> BrowsingMode:
"""Determine the browsing mode based on configuration.
Priority:
1. Explicit ``browsing_mode`` always wins.
2. Local browser → ``HUMAN_LIKE`` (real Chrome, act human).
3. Proxy provided → ``FAST`` (proxy handles identity).
4. Session data → ``HUMAN_LIKE`` (authenticated, be subtle).
5. Default → ``HUMAN_LIKE``.
Args:
explicit_mode: Explicitly requested mode, or ``None``.
Returns:
The resolved ``BrowsingMode``.
"""
if explicit_mode:
return explicit_mode
if self.use_local_browser:
return BrowsingMode.HUMAN_LIKE
if self.proxy:
return BrowsingMode.FAST
return BrowsingMode.HUMAN_LIKE
def _create_backend(self) -> BrowserBackend:
"""Create the appropriate browser backend.
Returns:
A ``LocalBrowserBackend`` or ``ManagedBrowserBackend``
based on the ``use_local_browser`` flag.
"""
if self.use_local_browser:
return LocalBrowserBackend(headless=self.headless)
return ManagedBrowserBackend(
headless=self.headless,
browser_launch_options=self.browser_launch_options,
proxy=self.proxy,
session_data=self.session_data,
)
async def _create_configured_page(self) -> Page:
"""Create a new page with session storage applied.
Returns:
A configured Playwright ``Page`` instance.
"""
logger.debug("Creating and configuring new page")
page = await self._context.new_page()
# Apply session storage if using managed backend.
if isinstance(self._backend, ManagedBrowserBackend):
await self._backend.apply_session_storage(page)
return page
def _get_available_page(self) -> Page:
"""Get the next available page from the pool (round-robin).
Returns:
A ``Page`` from the pool.
"""
page = self._page_pool.pop(0)
self._page_pool.append(page)
return page
def _validate_url(self, url: str) -> None:
"""Validate that the URL has a proper HTTP(S) scheme.
Args:
url: URL string to validate.
Raises:
ValueError: If the URL is empty or doesn't start with
``http://`` or ``https://``.
"""
if not url or not url.startswith(("http://", "https://")):
raise ValueError(f"Invalid URL: {url}")
def _classify_http_status(self, http_status_code: int | None) -> ScrapStatus:
"""Map an HTTP status code to a ``ScrapStatus``.
Args:
http_status_code: The HTTP status code, or ``None``.
Returns:
The appropriate ``ScrapStatus``.
"""
if http_status_code is None:
return ScrapStatus.SUCCESS
if http_status_code is None:
return ScrapStatus.FAILED
if http_status_code == 429:
return ScrapStatus.RATE_LIMITED
if http_status_code == 403:
return ScrapStatus.BLOCKED
if 200 <= http_status_code < 400:
return ScrapStatus.SUCCESS
if http_status_code is None:
return ScrapStatus.FAILED
return ScrapStatus.SUCCESS
def _get_session_id(self) -> str | None:
"""Return the session site identifier, or ``None``.
Returns:
The ``site`` field from the configured session data.
"""
return self.session_data.site if self.session_data else None
def _record_event(self, status: ScrapStatus, sent_at: float) -> None:
"""Record a scraping event to the session statistics.
If session data is configured, adds a ``RequestEvent`` to the
session's time-series log for performance analysis.
Args:
status: Outcome status of the request.
sent_at: Unix timestamp when the request was sent.
"""
if self.session_data:
self.session_data.stats.add_request_event(
request_event=RequestEvent(sent_at=sent_at, request_status=status)
)
async def _apply_human_like_behavior(self, page: Page) -> None:
"""Apply human-like scrolling behaviour to avoid bot detection.
Performs a smooth scroll to a random position with realistic
timing delays.
Args:
page: Playwright ``Page`` to apply behaviour to.
"""
try:
page_height = await page.evaluate("document.body.scrollHeight")
if page_height <= 0:
return
scroll_pos = int(page_height * random.uniform(0.2, 0.8))
await page.evaluate(
f"""
window.scrollTo({{
top: {scroll_pos},
behavior: 'smooth'
}});
"""
)
await page.wait_for_timeout(
random.randint(MIN_SCROLL_WAIT_MS, MAX_SCROLL_WAIT_MS)
)
await page.wait_for_timeout(random.randint(MIN_PAUSE_MS, MAX_PAUSE_MS))
except Exception as exc:
logger.debug("Human-like behaviour failed: %s", exc)