Build a Web Scraping Pipeline with Haystack 2.0

March 9, 2026 ยท 11 min read Haystack RAG Python Pipelines

Haystack by deepset is one of the most popular frameworks for building production-ready RAG (Retrieval-Augmented Generation) and NLP pipelines. Haystack 2.0 introduced a completely redesigned pipeline architecture with composable components โ€” and adding web scraping to that pipeline gives your RAG systems access to real-time data instead of just static documents.

In this tutorial, you'll build custom Haystack components that connect to the WebPerception API for scraping, screenshots, and AI extraction โ€” then wire them into full RAG and agent pipelines.

Why Haystack for Web Scraping Pipelines?

Prerequisites

Building Custom Haystack Components

Haystack 2.0 uses the @component decorator to define pipeline components. Each component declares its inputs and outputs with type annotations.

WebScraper Component

import httpx
from haystack import component, Document
from typing import List

@component
class WebScraper:
    """Scrapes URLs and returns Haystack Documents."""

    def __init__(self, api_key: str, format: str = "markdown"):
        self.api_key = api_key
        self.base_url = "https://api.mantisapi.com/v1"
        self.format = format

    @component.output_types(documents=List[Document])
    def run(self, urls: List[str]):
        documents = []
        with httpx.Client(timeout=30) as client:
            for url in urls:
                try:
                    resp = client.post(
                        f"{self.base_url}/scrape",
                        headers={"x-api-key": self.api_key},
                        json={"url": url, "format": self.format, "wait_for": "networkidle"}
                    )
                    resp.raise_for_status()
                    data = resp.json()
                    documents.append(Document(
                        content=data.get("content", ""),
                        meta={"url": url, "source": "web_scrape", "title": data.get("title", "")}
                    ))
                except Exception as e:
                    documents.append(Document(
                        content=f"Error scraping {url}: {str(e)}",
                        meta={"url": url, "source": "web_scrape", "error": True}
                    ))
        return {"documents": documents}

DataExtractor Component

@component
class DataExtractor:
    """Extracts structured data from URLs using AI."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.mantisapi.com/v1"

    @component.output_types(documents=List[Document])
    def run(self, urls: List[str], prompt: str):
        documents = []
        with httpx.Client(timeout=60) as client:
            for url in urls:
                try:
                    resp = client.post(
                        f"{self.base_url}/extract",
                        headers={"x-api-key": self.api_key},
                        json={"url": url, "prompt": prompt, "wait_for": "networkidle"}
                    )
                    resp.raise_for_status()
                    data = resp.json()
                    documents.append(Document(
                        content=data.get("data", ""),
                        meta={"url": url, "source": "ai_extraction", "prompt": prompt}
                    ))
                except Exception as e:
                    documents.append(Document(
                        content=f"Error extracting from {url}: {str(e)}",
                        meta={"url": url, "source": "ai_extraction", "error": True}
                    ))
        return {"documents": documents}

ScreenshotCapture Component

@component
class ScreenshotCapture:
    """Takes screenshots of URLs and returns image URLs as Documents."""

    def __init__(self, api_key: str, width: int = 1280):
        self.api_key = api_key
        self.base_url = "https://api.mantisapi.com/v1"
        self.width = width

    @component.output_types(documents=List[Document])
    def run(self, urls: List[str]):
        documents = []
        with httpx.Client(timeout=30) as client:
            for url in urls:
                try:
                    resp = client.post(
                        f"{self.base_url}/screenshot",
                        headers={"x-api-key": self.api_key},
                        json={"url": url, "viewport": {"width": self.width, "height": 720}, "format": "png"}
                    )
                    resp.raise_for_status()
                    data = resp.json()
                    documents.append(Document(
                        content=f"Screenshot of {url}: {data.get('url', '')}",
                        meta={"url": url, "screenshot_url": data.get("url", ""), "source": "screenshot"}
                    ))
                except Exception as e:
                    documents.append(Document(
                        content=f"Error capturing {url}: {str(e)}",
                        meta={"url": url, "source": "screenshot", "error": True}
                    ))
        return {"documents": documents}

Pipeline 1: Web Scraping โ†’ RAG

The most powerful pattern is combining web scraping with RAG. Scrape live web data, store it in a document store, then retrieve and generate answers.

from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentSplitter

# Build the indexing pipeline
doc_store = InMemoryDocumentStore()

indexing_pipeline = Pipeline()
indexing_pipeline.add_component("scraper", WebScraper(api_key=MANTIS_KEY))
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=5))
indexing_pipeline.add_component("writer", DocumentWriter(document_store=doc_store))

# Connect: scraper โ†’ splitter โ†’ writer
indexing_pipeline.connect("scraper.documents", "splitter.documents")
indexing_pipeline.connect("splitter.documents", "writer.documents")

# Index some pages
indexing_pipeline.run({"scraper": {"urls": [
    "https://docs.example.com/api-reference",
    "https://docs.example.com/getting-started",
    "https://blog.example.com/latest-update"
]}})

Query Pipeline with Retrieved Context

from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator

query_pipeline = Pipeline()
query_pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=doc_store))
query_pipeline.add_component("prompt", PromptBuilder(template="""
Answer the question based on the following web content:

{% for doc in documents %}
Source: {{ doc.meta.url }}
{{ doc.content }}
---
{% endfor %}

Question: {{ question }}
Answer:
"""))
query_pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o"))

query_pipeline.connect("retriever.documents", "prompt.documents")
query_pipeline.connect("prompt", "llm")

# Ask a question
result = query_pipeline.run({
    "retriever": {"query": "What are the API rate limits?"},
    "prompt": {"question": "What are the API rate limits?"}
})
print(result["llm"]["replies"][0])

Pipeline 2: Live Web Research Agent

Build a pipeline that scrapes multiple URLs, extracts key data, and generates a research report โ€” all in one run:

research_pipeline = Pipeline()
research_pipeline.add_component("scraper", WebScraper(api_key=MANTIS_KEY))
research_pipeline.add_component("prompt", PromptBuilder(template="""
You are a research analyst. Based on the following scraped web content, 
write a comprehensive research report.

{% for doc in documents %}
## Source: {{ doc.meta.url }}
{{ doc.content[:2000] }}
---
{% endfor %}

Research topic: {{ topic }}

Write a detailed report with key findings, data points, and recommendations.
"""))
research_pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o"))

research_pipeline.connect("scraper.documents", "prompt.documents")
research_pipeline.connect("prompt", "llm")

# Run research
result = research_pipeline.run({
    "scraper": {"urls": [
        "https://stripe.com/pricing",
        "https://square.com/pricing",
        "https://adyen.com/pricing"
    ]},
    "prompt": {"topic": "Payment processing pricing comparison for SaaS startups"}
})
print(result["llm"]["replies"][0])

Pipeline 3: Competitor Monitoring

Schedule this pipeline to run daily and detect changes in competitor websites:

import json
from datetime import datetime

@component
class ChangeDetector:
    """Compares new documents against previously stored versions."""

    def __init__(self, cache_path: str = "competitor_cache.json"):
        self.cache_path = cache_path
        try:
            with open(cache_path) as f:
                self.cache = json.load(f)
        except FileNotFoundError:
            self.cache = {}

    @component.output_types(changed=List[Document], unchanged=List[Document])
    def run(self, documents: List[Document]):
        changed, unchanged = [], []
        new_cache = {}

        for doc in documents:
            url = doc.meta.get("url", "")
            content_hash = hash(doc.content)
            new_cache[url] = {"hash": content_hash, "last_checked": datetime.now().isoformat()}

            if url in self.cache and self.cache[url]["hash"] == content_hash:
                unchanged.append(doc)
            else:
                doc.meta["change_detected"] = True
                doc.meta["previous_hash"] = self.cache.get(url, {}).get("hash")
                changed.append(doc)

        self.cache = new_cache
        with open(self.cache_path, "w") as f:
            json.dump(new_cache, f)

        return {"changed": changed, "unchanged": unchanged}

# Build monitoring pipeline
monitor_pipeline = Pipeline()
monitor_pipeline.add_component("scraper", WebScraper(api_key=MANTIS_KEY))
monitor_pipeline.add_component("detector", ChangeDetector())
monitor_pipeline.add_component("prompt", PromptBuilder(template="""
The following competitor pages have changed since last check:

{% for doc in documents %}
URL: {{ doc.meta.url }}
Content: {{ doc.content[:1500] }}
---
{% endfor %}

Summarize the key changes and their strategic implications.
"""))
monitor_pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o"))

monitor_pipeline.connect("scraper.documents", "detector.documents")
monitor_pipeline.connect("detector.changed", "prompt.documents")
monitor_pipeline.connect("prompt", "llm")

Using Web Scraping as Agent Tools

Haystack 2.0 supports agent patterns with tool use. Here's how to register scraping functions as agent tools:

from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

# Define tools for the chat model
tools = [
    {
        "type": "function",
        "function": {
            "name": "scrape_url",
            "description": "Scrape a webpage and return its content",
            "parameters": {
                "type": "object",
                "properties": {
                    "url": {"type": "string", "description": "URL to scrape"},
                    "format": {"type": "string", "enum": ["text", "markdown"], "default": "markdown"}
                },
                "required": ["url"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "extract_data",
            "description": "Extract structured data from a webpage using AI",
            "parameters": {
                "type": "object",
                "properties": {
                    "url": {"type": "string", "description": "URL to extract from"},
                    "prompt": {"type": "string", "description": "What data to extract"}
                },
                "required": ["url", "prompt"]
            }
        }
    }
]

generator = OpenAIChatGenerator(model="gpt-4o", generation_kwargs={"tools": tools})

messages = [
    ChatMessage.from_system("You are a research assistant with web scraping tools."),
    ChatMessage.from_user("Find the current pricing for Vercel Pro plan")
]

# Agent loop with tool execution
while True:
    result = generator.run(messages=messages)
    reply = result["replies"][0]

    if not reply.meta.get("tool_calls"):
        print(reply.content)
        break

    messages.append(reply)

    for tool_call in reply.meta["tool_calls"]:
        fn = tool_call["function"]
        args = json.loads(fn["arguments"])

        if fn["name"] == "scrape_url":
            scraper = WebScraper(api_key=MANTIS_KEY)
            docs = scraper.run(urls=[args["url"]])
            tool_result = docs["documents"][0].content
        elif fn["name"] == "extract_data":
            extractor = DataExtractor(api_key=MANTIS_KEY)
            docs = extractor.run(urls=[args["url"]], prompt=args["prompt"])
            tool_result = docs["documents"][0].content

        messages.append(ChatMessage.from_tool(
            tool_result, origin=tool_call
        ))

Cost Optimization Tips

StrategyImplementationImpact
Cache scraped documentsUse ChangeDetector or Redis cache before scraping50-80% fewer API calls
Split before embeddingUse DocumentSplitter to chunk contentBetter retrieval accuracy
Filter URLsSkip unchanged pages with hash comparisonOnly scrape what's new
Text format for RAGUse format="text" when markdown overhead isn't neededSmaller documents, faster processing
Batch pipeline runsPass multiple URLs in a single pipeline runBetter HTTP connection reuse

Start Building with Haystack + Mantis

Get 100 free API calls per month. No credit card required.

Get Your API Key โ†’

What You Learned

Haystack's component-based pipeline architecture is a natural fit for web scraping workflows. Each component does one thing well, and the pipeline graph handles data flow, error propagation, and serialization. Combined with the WebPerception API, you can build production-grade RAG systems that stay current with live web data.

Next steps: Check out our quickstart guide to get your API key, or explore our other framework integrations.