Web Scraping with httpx and Python in 2026: The Complete Guide

Updated March 2026 · 20 min read

httpx is the modern Python HTTP client that's replacing Requests for serious web scraping. With native async support, HTTP/2, and a familiar API, it lets you scrape 10-20x faster than synchronous alternatives — without the complexity of aiohttp.

This guide covers everything from basic requests to production-ready async scrapers with proxy rotation, rate limiting, and error handling.

Table of Contents

Installation & Setup

Install httpx with optional HTTP/2 and SOCKS proxy support:

# Basic installation
pip install httpx

# With HTTP/2 support (recommended)
pip install httpx[http2]

# With SOCKS proxy support
pip install httpx[socks]

# Full installation (HTTP/2 + SOCKS + CLI)
pip install httpx[http2,socks,cli]

# Also install BeautifulSoup for HTML parsing
pip install beautifulsoup4 lxml

Verify your installation:

import httpx
print(httpx.__version__)  # 0.28.x or later

Basic Scraping with httpx

httpx's API is nearly identical to Requests — if you know Requests, you already know httpx:

import httpx
from bs4 import BeautifulSoup

# Simple GET request
response = httpx.get("https://example.com")
print(response.status_code)  # 200
print(response.text[:200])   # HTML content

# Parse with BeautifulSoup
soup = BeautifulSoup(response.text, "lxml")
title = soup.find("title").text
print(f"Page title: {title}")

Using a Client (Recommended)

Always use an httpx.Client for scraping — it reuses connections and is significantly faster:

import httpx
from bs4 import BeautifulSoup

# Client reuses TCP connections (like requests.Session)
with httpx.Client() as client:
    # First request opens connection
    response = client.get("https://example.com/page1")
    soup = BeautifulSoup(response.text, "lxml")
    items_page1 = soup.select(".item")
    
    # Subsequent requests reuse the connection — much faster
    response = client.get("https://example.com/page2")
    soup = BeautifulSoup(response.text, "lxml")
    items_page2 = soup.select(".item")
    
    print(f"Page 1: {len(items_page1)} items")
    print(f"Page 2: {len(items_page2)} items")

POST Requests & Form Data

import httpx

with httpx.Client() as client:
    # POST with form data
    response = client.post(
        "https://example.com/search",
        data={"query": "web scraping", "page": 1}
    )
    
    # POST with JSON
    response = client.post(
        "https://api.example.com/search",
        json={"query": "web scraping", "limit": 50}
    )
    
    # Access JSON response
    results = response.json()
    for item in results["data"]:
        print(item["title"])

Async Scraping with asyncio

This is where httpx shines. Async scraping lets you fetch many pages concurrently instead of one-by-one:

import httpx
import asyncio
from bs4 import BeautifulSoup

async def scrape_page(client, url):
    """Scrape a single page."""
    response = await client.get(url)
    soup = BeautifulSoup(response.text, "lxml")
    title = soup.find("h1").text if soup.find("h1") else "No title"
    return {"url": url, "title": title}

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(1, 51)]
    
    async with httpx.AsyncClient() as client:
        # Fetch all 50 pages concurrently
        tasks = [scrape_page(client, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    for result in results:
        print(f"{result['title']} — {result['url']}")

asyncio.run(main())

Performance comparison: Scraping 50 pages synchronously with Requests takes ~25 seconds (0.5s per page). Async httpx does it in ~2-3 seconds. That's a 10x speedup.

Controlled Concurrency with Semaphore

Don't blast 1,000 requests at once — use a semaphore to limit concurrency:

import httpx
import asyncio
from bs4 import BeautifulSoup

async def scrape_page(client, url, semaphore):
    """Scrape with concurrency limit."""
    async with semaphore:
        response = await client.get(url)
        await asyncio.sleep(0.1)  # Polite delay
        soup = BeautifulSoup(response.text, "lxml")
        return {
            "url": url,
            "title": soup.find("h1").text if soup.find("h1") else "N/A",
            "status": response.status_code
        }

async def main():
    urls = [f"https://example.com/item/{i}" for i in range(1, 201)]
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent requests
    
    async with httpx.AsyncClient(
        timeout=httpx.Timeout(30.0),
        follow_redirects=True
    ) as client:
        tasks = [scrape_page(client, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful = [r for r in results if isinstance(r, dict)]
    failed = [r for r in results if isinstance(r, Exception)]
    print(f"Scraped: {len(successful)} | Failed: {len(failed)}")

asyncio.run(main())

HTTP/2 for Faster Scraping

httpx is one of the few Python libraries that supports HTTP/2. This means multiplexed requests over a single connection — fewer handshakes, less overhead:

import httpx
import asyncio

async def scrape_with_http2():
    # Enable HTTP/2 — requires httpx[http2]
    async with httpx.AsyncClient(http2=True) as client:
        response = await client.get("https://example.com")
        
        # Check which protocol was used
        print(f"HTTP version: {response.http_version}")  # HTTP/2
        print(f"Status: {response.status_code}")
        
        # Multiple requests multiplex over one connection
        urls = [f"https://example.com/page/{i}" for i in range(1, 11)]
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        
        for resp in responses:
            print(f"{resp.url} — {resp.http_version} — {resp.status_code}")

asyncio.run(scrape_with_http2())

When HTTP/2 helps most: Scraping many pages from the same domain. The multiplexing means all requests share one TCP connection instead of opening 10+ separate connections.

Headers & User-Agent Rotation

Proper headers are essential to avoid blocks:

import httpx
import random

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 14.4; rv:125.0) Gecko/20100101 Firefox/125.0",
]

def get_headers():
    """Generate realistic browser headers."""
    ua = random.choice(USER_AGENTS)
    return {
        "User-Agent": ua,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "DNT": "1",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1",
    }

# Set default headers on the client
with httpx.Client(headers=get_headers()) as client:
    response = client.get("https://example.com")
    print(response.status_code)

# Rotate per-request
with httpx.Client() as client:
    for i in range(10):
        response = client.get(
            f"https://example.com/page/{i}",
            headers=get_headers()
        )
        print(f"Page {i}: {response.status_code}")

Sessions, Cookies & Authentication

httpx clients automatically handle cookies across requests:

import httpx

# Cookies persist across requests within a client
with httpx.Client() as client:
    # Login
    login_response = client.post(
        "https://example.com/login",
        data={"username": "user", "password": "pass"}
    )
    # Session cookie is now stored
    
    # Subsequent requests include the session cookie
    dashboard = client.get("https://example.com/dashboard")
    print(dashboard.status_code)
    
    # Access cookies
    for name, value in client.cookies.items():
        print(f"Cookie: {name} = {value[:20]}...")

# Manual cookie setting
with httpx.Client(cookies={"session_id": "abc123"}) as client:
    response = client.get("https://example.com/protected")

# Bearer token authentication
with httpx.Client() as client:
    response = client.get(
        "https://api.example.com/data",
        headers={"Authorization": "Bearer your-api-token-here"}
    )

# Basic auth (built-in)
with httpx.Client(auth=("username", "password")) as client:
    response = client.get("https://example.com/api/data")

Async Session with Login

import httpx
import asyncio
from bs4 import BeautifulSoup

async def scrape_authenticated():
    async with httpx.AsyncClient(follow_redirects=True) as client:
        # Login
        await client.post(
            "https://example.com/login",
            data={"email": "user@example.com", "password": "secret"}
        )
        
        # Now scrape protected pages
        urls = [f"https://example.com/data/page/{i}" for i in range(1, 21)]
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        
        for resp in responses:
            soup = BeautifulSoup(resp.text, "lxml")
            data = soup.select(".data-row")
            print(f"{resp.url}: {len(data)} rows")

asyncio.run(scrape_authenticated())

Proxy Rotation

httpx supports HTTP, HTTPS, and SOCKS proxies:

import httpx
import random

# Single proxy
with httpx.Client(proxy="http://proxy.example.com:8080") as client:
    response = client.get("https://example.com")

# SOCKS proxy (requires httpx[socks])
with httpx.Client(proxy="socks5://proxy.example.com:1080") as client:
    response = client.get("https://example.com")

# Authenticated proxy
with httpx.Client(proxy="http://user:pass@proxy.example.com:8080") as client:
    response = client.get("https://example.com")

Rotating Proxies in Async Mode

import httpx
import asyncio
import random

PROXIES = [
    "http://proxy1.example.com:8080",
    "http://proxy2.example.com:8080",
    "http://proxy3.example.com:8080",
    "http://proxy4.example.com:8080",
    "http://proxy5.example.com:8080",
]

async def scrape_with_proxy(url, semaphore):
    """Scrape using a random proxy."""
    async with semaphore:
        proxy = random.choice(PROXIES)
        async with httpx.AsyncClient(proxy=proxy, timeout=20.0) as client:
            try:
                response = await client.get(url)
                return {"url": url, "status": response.status_code, "proxy": proxy}
            except httpx.ProxyError:
                # Try without proxy as fallback
                async with httpx.AsyncClient(timeout=20.0) as direct:
                    response = await direct.get(url)
                    return {"url": url, "status": response.status_code, "proxy": "direct"}

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(1, 101)]
    semaphore = asyncio.Semaphore(10)
    tasks = [scrape_with_proxy(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful = [r for r in results if isinstance(r, dict)]
    print(f"Scraped {len(successful)} pages via proxies")

asyncio.run(main())

Timeout & Retry Configuration

httpx has granular timeout control — much better than Requests:

import httpx

# Granular timeouts
timeout = httpx.Timeout(
    connect=5.0,    # Time to establish connection
    read=15.0,      # Time to read response
    write=5.0,      # Time to send request
    pool=10.0       # Time to acquire connection from pool
)

with httpx.Client(timeout=timeout) as client:
    response = client.get("https://example.com")

# Simple timeout (applies to all phases)
with httpx.Client(timeout=30.0) as client:
    response = client.get("https://example.com")

# Disable timeout (not recommended)
with httpx.Client(timeout=None) as client:
    response = client.get("https://slow-site.com")

Retry Logic with Exponential Backoff

import httpx
import asyncio
import random

async def fetch_with_retry(client, url, max_retries=3):
    """Fetch URL with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            response = await client.get(url)
            
            if response.status_code == 200:
                return response
            elif response.status_code == 429:
                # Rate limited — wait longer
                retry_after = int(response.headers.get("Retry-After", 5))
                await asyncio.sleep(retry_after)
            elif response.status_code >= 500:
                # Server error — retry with backoff
                wait = (2 ** attempt) + random.uniform(0, 1)
                await asyncio.sleep(wait)
            else:
                return response  # 4xx errors — don't retry
                
        except (httpx.ConnectTimeout, httpx.ReadTimeout):
            wait = (2 ** attempt) + random.uniform(0, 1)
            await asyncio.sleep(wait)
        except httpx.ConnectError:
            wait = (2 ** attempt) + random.uniform(0, 1)
            await asyncio.sleep(wait)
    
    return None  # All retries failed

async def main():
    async with httpx.AsyncClient(timeout=httpx.Timeout(20.0)) as client:
        response = await fetch_with_retry(client, "https://example.com")
        if response:
            print(f"Success: {response.status_code}")
        else:
            print("All retries failed")

asyncio.run(main())

Parsing HTML with BeautifulSoup

httpx fetches the HTML; BeautifulSoup parses it. Here's a complete pattern:

import httpx
import asyncio
from bs4 import BeautifulSoup

async def scrape_products(client, url):
    """Extract product data from a page."""
    response = await client.get(url)
    soup = BeautifulSoup(response.text, "lxml")
    
    products = []
    for card in soup.select(".product-card"):
        product = {
            "name": card.select_one(".product-name").text.strip(),
            "price": card.select_one(".price").text.strip(),
            "rating": card.select_one(".rating").text.strip() if card.select_one(".rating") else "N/A",
            "url": card.select_one("a")["href"] if card.select_one("a") else None,
            "in_stock": "out-of-stock" not in card.get("class", []),
        }
        products.append(product)
    
    return products

async def main():
    async with httpx.AsyncClient(
        headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/124.0.0.0"},
        timeout=20.0,
        follow_redirects=True
    ) as client:
        urls = [f"https://example.com/products?page={i}" for i in range(1, 11)]
        tasks = [scrape_products(client, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    all_products = [p for page in results for p in page]
    print(f"Scraped {len(all_products)} products from 10 pages")
    
    for product in all_products[:5]:
        print(f"  {product['name']} — {product['price']}")

asyncio.run(main())

Extracting JSON from APIs

import httpx
import asyncio

async def discover_api(client, page_url):
    """Many 'dynamic' sites load data from JSON APIs."""
    # Step 1: Check if there's a JSON API endpoint
    api_url = page_url.replace("/products", "/api/products")
    
    try:
        response = await client.get(api_url, headers={"Accept": "application/json"})
        if response.status_code == 200 and "application/json" in response.headers.get("content-type", ""):
            return response.json()  # Direct structured data!
    except Exception:
        pass
    
    return None  # Fall back to HTML scraping

async def main():
    async with httpx.AsyncClient() as client:
        # Try the API first — it's faster and more reliable
        data = await discover_api(client, "https://example.com/products?page=1")
        if data:
            print(f"Found API! Got {len(data.get('items', []))} items")
        else:
            print("No API found — fall back to HTML scraping")

asyncio.run(main())

Handling Pagination

Page Number Pagination

import httpx
import asyncio
from bs4 import BeautifulSoup

async def scrape_all_pages(base_url, max_pages=100):
    """Scrape paginated content with async httpx."""
    all_items = []
    semaphore = asyncio.Semaphore(5)
    
    async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client:
        # First, discover total pages
        response = await client.get(f"{base_url}?page=1")
        soup = BeautifulSoup(response.text, "lxml")
        
        # Find last page number
        pagination = soup.select(".pagination a")
        if pagination:
            last_page = min(int(pagination[-1].text), max_pages)
        else:
            last_page = 1
        
        # Scrape all pages concurrently
        async def fetch_page(page_num):
            async with semaphore:
                resp = await client.get(f"{base_url}?page={page_num}")
                s = BeautifulSoup(resp.text, "lxml")
                return s.select(".item")
        
        tasks = [fetch_page(i) for i in range(1, last_page + 1)]
        results = await asyncio.gather(*tasks)
        
        for page_items in results:
            for item in page_items:
                all_items.append(item.text.strip())
    
    return all_items

items = asyncio.run(scrape_all_pages("https://example.com/listings"))
print(f"Total items: {len(items)}")

Cursor-Based Pagination

import httpx
import asyncio

async def scrape_cursor_api():
    """Handle cursor/token-based pagination (common in APIs)."""
    all_data = []
    cursor = None
    
    async with httpx.AsyncClient(timeout=20.0) as client:
        while True:
            params = {"limit": 100}
            if cursor:
                params["cursor"] = cursor
            
            response = await client.get(
                "https://api.example.com/items",
                params=params
            )
            data = response.json()
            
            all_data.extend(data["items"])
            print(f"Fetched {len(data['items'])} items (total: {len(all_data)})")
            
            cursor = data.get("next_cursor")
            if not cursor:
                break
            
            await asyncio.sleep(0.2)  # Polite delay
    
    return all_data

items = asyncio.run(scrape_cursor_api())
print(f"Total: {len(items)} items")

Concurrency Control with Semaphore

The key to production async scraping — control your concurrency:

import httpx
import asyncio
import time
from bs4 import BeautifulSoup

class AsyncScraper:
    """Concurrency-controlled async scraper."""
    
    def __init__(self, max_concurrent=10, delay=0.1):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.delay = delay
        self.results = []
        self.errors = []
    
    async def fetch(self, client, url):
        """Fetch a single URL with concurrency control."""
        async with self.semaphore:
            try:
                response = await client.get(url)
                await asyncio.sleep(self.delay)
                
                if response.status_code == 200:
                    return {"url": url, "html": response.text, "status": 200}
                else:
                    return {"url": url, "html": None, "status": response.status_code}
            except Exception as e:
                self.errors.append({"url": url, "error": str(e)})
                return None
    
    async def scrape(self, urls):
        """Scrape all URLs with controlled concurrency."""
        start = time.time()
        
        async with httpx.AsyncClient(
            timeout=httpx.Timeout(20.0),
            follow_redirects=True,
            http2=True
        ) as client:
            tasks = [self.fetch(client, url) for url in urls]
            results = await asyncio.gather(*tasks)
        
        self.results = [r for r in results if r and r["html"]]
        elapsed = time.time() - start
        
        print(f"Scraped {len(self.results)}/{len(urls)} pages in {elapsed:.1f}s")
        print(f"Errors: {len(self.errors)}")
        print(f"Rate: {len(self.results)/elapsed:.1f} pages/sec")
        
        return self.results

# Usage
async def main():
    scraper = AsyncScraper(max_concurrent=15, delay=0.2)
    urls = [f"https://example.com/item/{i}" for i in range(1, 501)]
    results = await scraper.scrape(urls)
    
    # Parse results
    for r in results[:5]:
        soup = BeautifulSoup(r["html"], "lxml")
        title = soup.find("h1")
        print(f"{r['url']}: {title.text if title else 'no title'}")

asyncio.run(main())

Production-Ready Scraper

Here's a complete, production-grade async scraper with retries, logging, proxy rotation, rate limiting, and CSV/JSON export:

import httpx
import asyncio
import random
import csv
import json
import logging
import time
from bs4 import BeautifulSoup
from dataclasses import dataclass, asdict
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("scraper")

USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4) AppleWebKit/605.1.15 Safari/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
]

@dataclass
class ScrapedItem:
    url: str
    title: str
    price: str
    rating: str
    in_stock: bool

class ProductionScraper:
    """Production-grade async web scraper with httpx."""
    
    def __init__(
        self,
        max_concurrent: int = 10,
        delay: float = 0.2,
        max_retries: int = 3,
        proxies: list = None,
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.delay = delay
        self.max_retries = max_retries
        self.proxies = proxies or []
        self.results: list[ScrapedItem] = []
        self.errors: list[dict] = []
        self.stats = {"requests": 0, "success": 0, "failed": 0, "retries": 0}
    
    def _get_headers(self):
        return {
            "User-Agent": random.choice(USER_AGENTS),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
        }
    
    def _get_proxy(self):
        return random.choice(self.proxies) if self.proxies else None
    
    async def _fetch(self, client, url):
        """Fetch with retry and exponential backoff."""
        for attempt in range(self.max_retries):
            try:
                self.stats["requests"] += 1
                response = await client.get(url, headers=self._get_headers())
                
                if response.status_code == 200:
                    return response.text
                elif response.status_code == 429:
                    wait = int(response.headers.get("Retry-After", 5))
                    logger.warning(f"Rate limited on {url}, waiting {wait}s")
                    await asyncio.sleep(wait)
                    self.stats["retries"] += 1
                elif response.status_code >= 500:
                    wait = (2 ** attempt) + random.uniform(0, 1)
                    logger.warning(f"Server error {response.status_code} on {url}, retry in {wait:.1f}s")
                    await asyncio.sleep(wait)
                    self.stats["retries"] += 1
                else:
                    logger.error(f"HTTP {response.status_code} on {url}")
                    return None
                    
            except (httpx.TimeoutException, httpx.ConnectError) as e:
                wait = (2 ** attempt) + random.uniform(0, 1)
                logger.warning(f"{type(e).__name__} on {url}, retry in {wait:.1f}s")
                await asyncio.sleep(wait)
                self.stats["retries"] += 1
        
        return None
    
    def _parse(self, url, html):
        """Parse HTML and extract data."""
        soup = BeautifulSoup(html, "lxml")
        
        items = []
        for card in soup.select(".product-card"):
            try:
                item = ScrapedItem(
                    url=url,
                    title=card.select_one(".product-name").text.strip(),
                    price=card.select_one(".price").text.strip(),
                    rating=card.select_one(".rating").text.strip() if card.select_one(".rating") else "N/A",
                    in_stock="out-of-stock" not in card.get("class", []),
                )
                items.append(item)
            except (AttributeError, TypeError) as e:
                logger.debug(f"Parse error on {url}: {e}")
        
        return items
    
    async def _scrape_url(self, client, url):
        """Scrape a single URL with concurrency control."""
        async with self.semaphore:
            html = await self._fetch(client, url)
            await asyncio.sleep(self.delay)
            
            if html:
                items = self._parse(url, html)
                self.results.extend(items)
                self.stats["success"] += 1
                return items
            else:
                self.errors.append({"url": url})
                self.stats["failed"] += 1
                return []
    
    async def scrape(self, urls: list[str]) -> list[ScrapedItem]:
        """Scrape all URLs."""
        start = time.time()
        logger.info(f"Starting scrape of {len(urls)} URLs (concurrency: {self.semaphore._value})")
        
        client_kwargs = {
            "timeout": httpx.Timeout(20.0),
            "follow_redirects": True,
            "http2": True,
        }
        proxy = self._get_proxy()
        if proxy:
            client_kwargs["proxy"] = proxy
        
        async with httpx.AsyncClient(**client_kwargs) as client:
            tasks = [self._scrape_url(client, url) for url in urls]
            await asyncio.gather(*tasks)
        
        elapsed = time.time() - start
        logger.info(
            f"Done in {elapsed:.1f}s | "
            f"Success: {self.stats['success']}/{len(urls)} | "
            f"Items: {len(self.results)} | "
            f"Retries: {self.stats['retries']} | "
            f"Rate: {self.stats['success']/elapsed:.1f} pages/sec"
        )
        
        return self.results
    
    def export_csv(self, filepath: str):
        """Export results to CSV."""
        path = Path(filepath)
        with open(path, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=["url", "title", "price", "rating", "in_stock"])
            writer.writeheader()
            for item in self.results:
                writer.writerow(asdict(item))
        logger.info(f"Exported {len(self.results)} items to {path}")
    
    def export_json(self, filepath: str):
        """Export results to JSON."""
        path = Path(filepath)
        with open(path, "w", encoding="utf-8") as f:
            json.dump([asdict(item) for item in self.results], f, indent=2, ensure_ascii=False)
        logger.info(f"Exported {len(self.results)} items to {path}")

# Usage
async def main():
    scraper = ProductionScraper(
        max_concurrent=10,
        delay=0.2,
        max_retries=3,
        proxies=[]  # Add your proxies here
    )
    
    urls = [f"https://example.com/products?page={i}" for i in range(1, 101)]
    results = await scraper.scrape(urls)
    
    scraper.export_csv("products.csv")
    scraper.export_json("products.json")
    
    print(f"\nTop 5 results:")
    for item in results[:5]:
        print(f"  {item.title} — {item.price} (★{item.rating})")

if __name__ == "__main__":
    asyncio.run(main())

httpx vs Requests vs aiohttp vs Mantis API

Feature httpx Requests aiohttp Mantis API
Async Support ✅ Native ❌ No ✅ Native ✅ Via any client
Sync Support ✅ Yes ✅ Yes ❌ No ✅ Via any client
HTTP/2 ✅ Yes ❌ No ❌ No ✅ Handled
Connection Pooling ✅ Built-in ✅ via Session ✅ Built-in N/A (serverless)
Timeout Granularity ✅ 4 levels ⚠️ Basic ✅ 3 levels N/A
Proxy Support ✅ HTTP/SOCKS ✅ HTTP/SOCKS ✅ HTTP/SOCKS ✅ Built-in rotation
JS Rendering ❌ No ❌ No ❌ No ✅ Yes
Anti-Bot Bypass ❌ Manual ❌ Manual ❌ Manual ✅ Automatic
AI Data Extraction ❌ No ❌ No ❌ No ✅ Built-in
Learning Curve Low Very Low Medium Very Low
Best For Async scraping Simple scripts High concurrency Production at scale
Cost Free + infra Free + infra Free + infra From $0 (100 free/mo)

When to Use Each

The API Shortcut: Mantis

All the code above — async clients, proxy rotation, retries, headers, parsing — solves problems that a web scraping API handles automatically:

import httpx
import asyncio

async def scrape_with_mantis():
    """Replace 200 lines of scraping code with one API call."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.mantisapi.com/v1/scrape",
            headers={"x-api-key": "your-api-key"},
            json={
                "url": "https://example.com/products",
                "render_js": True,
                "extract": {
                    "selector": ".product-card",
                    "fields": {
                        "name": ".product-name",
                        "price": ".price",
                        "rating": ".rating"
                    }
                }
            }
        )
        
        data = response.json()
        for product in data["products"]:
            print(f"{product['name']} — {product['price']}")

asyncio.run(scrape_with_mantis())

Need Data at Scale? Skip the Infrastructure

Mantis handles proxies, headers, retries, JS rendering, and anti-detection. You write one API call — we handle the rest.

Start Free → 100 requests/month

When to Use httpx vs an API

FAQ

See the FAQ section above for answers to common questions about web scraping with httpx.

Next Steps