Build a Web Scraping Pipeline with Haystack 2.0
Haystack by deepset is one of the most popular frameworks for building production-ready RAG (Retrieval-Augmented Generation) and NLP pipelines. Haystack 2.0 introduced a completely redesigned pipeline architecture with composable components โ and adding web scraping to that pipeline gives your RAG systems access to real-time data instead of just static documents.
In this tutorial, you'll build custom Haystack components that connect to the WebPerception API for scraping, screenshots, and AI extraction โ then wire them into full RAG and agent pipelines.
Why Haystack for Web Scraping Pipelines?
- Component-based architecture โ Build reusable, composable pipeline components
- Pipeline graphs โ Connect components with typed inputs/outputs, branch and merge data flows
- Production-ready โ Built-in serialization, logging, and deployment tools
- RAG-native โ First-class support for document stores, retrievers, and generators
- Agent support โ Haystack 2.0 includes agent and tool-use patterns
Prerequisites
- A Mantis API key (free tier: 100 calls/month)
- Python 3.10+
pip install haystack-ai httpx- An OpenAI API key (for the generator components)
Building Custom Haystack Components
Haystack 2.0 uses the @component decorator to define pipeline components. Each component declares its inputs and outputs with type annotations.
WebScraper Component
import httpx
from haystack import component, Document
from typing import List
@component
class WebScraper:
"""Scrapes URLs and returns Haystack Documents."""
def __init__(self, api_key: str, format: str = "markdown"):
self.api_key = api_key
self.base_url = "https://api.mantisapi.com/v1"
self.format = format
@component.output_types(documents=List[Document])
def run(self, urls: List[str]):
documents = []
with httpx.Client(timeout=30) as client:
for url in urls:
try:
resp = client.post(
f"{self.base_url}/scrape",
headers={"x-api-key": self.api_key},
json={"url": url, "format": self.format, "wait_for": "networkidle"}
)
resp.raise_for_status()
data = resp.json()
documents.append(Document(
content=data.get("content", ""),
meta={"url": url, "source": "web_scrape", "title": data.get("title", "")}
))
except Exception as e:
documents.append(Document(
content=f"Error scraping {url}: {str(e)}",
meta={"url": url, "source": "web_scrape", "error": True}
))
return {"documents": documents}
DataExtractor Component
@component
class DataExtractor:
"""Extracts structured data from URLs using AI."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.mantisapi.com/v1"
@component.output_types(documents=List[Document])
def run(self, urls: List[str], prompt: str):
documents = []
with httpx.Client(timeout=60) as client:
for url in urls:
try:
resp = client.post(
f"{self.base_url}/extract",
headers={"x-api-key": self.api_key},
json={"url": url, "prompt": prompt, "wait_for": "networkidle"}
)
resp.raise_for_status()
data = resp.json()
documents.append(Document(
content=data.get("data", ""),
meta={"url": url, "source": "ai_extraction", "prompt": prompt}
))
except Exception as e:
documents.append(Document(
content=f"Error extracting from {url}: {str(e)}",
meta={"url": url, "source": "ai_extraction", "error": True}
))
return {"documents": documents}
ScreenshotCapture Component
@component
class ScreenshotCapture:
"""Takes screenshots of URLs and returns image URLs as Documents."""
def __init__(self, api_key: str, width: int = 1280):
self.api_key = api_key
self.base_url = "https://api.mantisapi.com/v1"
self.width = width
@component.output_types(documents=List[Document])
def run(self, urls: List[str]):
documents = []
with httpx.Client(timeout=30) as client:
for url in urls:
try:
resp = client.post(
f"{self.base_url}/screenshot",
headers={"x-api-key": self.api_key},
json={"url": url, "viewport": {"width": self.width, "height": 720}, "format": "png"}
)
resp.raise_for_status()
data = resp.json()
documents.append(Document(
content=f"Screenshot of {url}: {data.get('url', '')}",
meta={"url": url, "screenshot_url": data.get("url", ""), "source": "screenshot"}
))
except Exception as e:
documents.append(Document(
content=f"Error capturing {url}: {str(e)}",
meta={"url": url, "source": "screenshot", "error": True}
))
return {"documents": documents}
Pipeline 1: Web Scraping โ RAG
The most powerful pattern is combining web scraping with RAG. Scrape live web data, store it in a document store, then retrieve and generate answers.
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentSplitter
# Build the indexing pipeline
doc_store = InMemoryDocumentStore()
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("scraper", WebScraper(api_key=MANTIS_KEY))
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=5))
indexing_pipeline.add_component("writer", DocumentWriter(document_store=doc_store))
# Connect: scraper โ splitter โ writer
indexing_pipeline.connect("scraper.documents", "splitter.documents")
indexing_pipeline.connect("splitter.documents", "writer.documents")
# Index some pages
indexing_pipeline.run({"scraper": {"urls": [
"https://docs.example.com/api-reference",
"https://docs.example.com/getting-started",
"https://blog.example.com/latest-update"
]}})
Query Pipeline with Retrieved Context
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
query_pipeline = Pipeline()
query_pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=doc_store))
query_pipeline.add_component("prompt", PromptBuilder(template="""
Answer the question based on the following web content:
{% for doc in documents %}
Source: {{ doc.meta.url }}
{{ doc.content }}
---
{% endfor %}
Question: {{ question }}
Answer:
"""))
query_pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o"))
query_pipeline.connect("retriever.documents", "prompt.documents")
query_pipeline.connect("prompt", "llm")
# Ask a question
result = query_pipeline.run({
"retriever": {"query": "What are the API rate limits?"},
"prompt": {"question": "What are the API rate limits?"}
})
print(result["llm"]["replies"][0])
Pipeline 2: Live Web Research Agent
Build a pipeline that scrapes multiple URLs, extracts key data, and generates a research report โ all in one run:
research_pipeline = Pipeline()
research_pipeline.add_component("scraper", WebScraper(api_key=MANTIS_KEY))
research_pipeline.add_component("prompt", PromptBuilder(template="""
You are a research analyst. Based on the following scraped web content,
write a comprehensive research report.
{% for doc in documents %}
## Source: {{ doc.meta.url }}
{{ doc.content[:2000] }}
---
{% endfor %}
Research topic: {{ topic }}
Write a detailed report with key findings, data points, and recommendations.
"""))
research_pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o"))
research_pipeline.connect("scraper.documents", "prompt.documents")
research_pipeline.connect("prompt", "llm")
# Run research
result = research_pipeline.run({
"scraper": {"urls": [
"https://stripe.com/pricing",
"https://square.com/pricing",
"https://adyen.com/pricing"
]},
"prompt": {"topic": "Payment processing pricing comparison for SaaS startups"}
})
print(result["llm"]["replies"][0])
Pipeline 3: Competitor Monitoring
Schedule this pipeline to run daily and detect changes in competitor websites:
import json
from datetime import datetime
@component
class ChangeDetector:
"""Compares new documents against previously stored versions."""
def __init__(self, cache_path: str = "competitor_cache.json"):
self.cache_path = cache_path
try:
with open(cache_path) as f:
self.cache = json.load(f)
except FileNotFoundError:
self.cache = {}
@component.output_types(changed=List[Document], unchanged=List[Document])
def run(self, documents: List[Document]):
changed, unchanged = [], []
new_cache = {}
for doc in documents:
url = doc.meta.get("url", "")
content_hash = hash(doc.content)
new_cache[url] = {"hash": content_hash, "last_checked": datetime.now().isoformat()}
if url in self.cache and self.cache[url]["hash"] == content_hash:
unchanged.append(doc)
else:
doc.meta["change_detected"] = True
doc.meta["previous_hash"] = self.cache.get(url, {}).get("hash")
changed.append(doc)
self.cache = new_cache
with open(self.cache_path, "w") as f:
json.dump(new_cache, f)
return {"changed": changed, "unchanged": unchanged}
# Build monitoring pipeline
monitor_pipeline = Pipeline()
monitor_pipeline.add_component("scraper", WebScraper(api_key=MANTIS_KEY))
monitor_pipeline.add_component("detector", ChangeDetector())
monitor_pipeline.add_component("prompt", PromptBuilder(template="""
The following competitor pages have changed since last check:
{% for doc in documents %}
URL: {{ doc.meta.url }}
Content: {{ doc.content[:1500] }}
---
{% endfor %}
Summarize the key changes and their strategic implications.
"""))
monitor_pipeline.add_component("llm", OpenAIGenerator(model="gpt-4o"))
monitor_pipeline.connect("scraper.documents", "detector.documents")
monitor_pipeline.connect("detector.changed", "prompt.documents")
monitor_pipeline.connect("prompt", "llm")
Using Web Scraping as Agent Tools
Haystack 2.0 supports agent patterns with tool use. Here's how to register scraping functions as agent tools:
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
# Define tools for the chat model
tools = [
{
"type": "function",
"function": {
"name": "scrape_url",
"description": "Scrape a webpage and return its content",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to scrape"},
"format": {"type": "string", "enum": ["text", "markdown"], "default": "markdown"}
},
"required": ["url"]
}
}
},
{
"type": "function",
"function": {
"name": "extract_data",
"description": "Extract structured data from a webpage using AI",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to extract from"},
"prompt": {"type": "string", "description": "What data to extract"}
},
"required": ["url", "prompt"]
}
}
}
]
generator = OpenAIChatGenerator(model="gpt-4o", generation_kwargs={"tools": tools})
messages = [
ChatMessage.from_system("You are a research assistant with web scraping tools."),
ChatMessage.from_user("Find the current pricing for Vercel Pro plan")
]
# Agent loop with tool execution
while True:
result = generator.run(messages=messages)
reply = result["replies"][0]
if not reply.meta.get("tool_calls"):
print(reply.content)
break
messages.append(reply)
for tool_call in reply.meta["tool_calls"]:
fn = tool_call["function"]
args = json.loads(fn["arguments"])
if fn["name"] == "scrape_url":
scraper = WebScraper(api_key=MANTIS_KEY)
docs = scraper.run(urls=[args["url"]])
tool_result = docs["documents"][0].content
elif fn["name"] == "extract_data":
extractor = DataExtractor(api_key=MANTIS_KEY)
docs = extractor.run(urls=[args["url"]], prompt=args["prompt"])
tool_result = docs["documents"][0].content
messages.append(ChatMessage.from_tool(
tool_result, origin=tool_call
))
Cost Optimization Tips
| Strategy | Implementation | Impact |
|---|---|---|
| Cache scraped documents | Use ChangeDetector or Redis cache before scraping | 50-80% fewer API calls |
| Split before embedding | Use DocumentSplitter to chunk content | Better retrieval accuracy |
| Filter URLs | Skip unchanged pages with hash comparison | Only scrape what's new |
| Text format for RAG | Use format="text" when markdown overhead isn't needed | Smaller documents, faster processing |
| Batch pipeline runs | Pass multiple URLs in a single pipeline run | Better HTTP connection reuse |
Start Building with Haystack + Mantis
Get 100 free API calls per month. No credit card required.
Get Your API Key โWhat You Learned
- How to build custom Haystack 2.0 components for scraping, extraction, and screenshots
- RAG + web scraping โ index live web data and query it with retrieval-augmented generation
- Research pipelines โ scrape multiple sources and generate reports in one pipeline run
- Competitor monitoring โ detect changes with hash-based change detection
- Agent tool use โ register scraping functions as tools for chat-based agents
- Cost optimization โ caching, filtering, and batching strategies
Haystack's component-based pipeline architecture is a natural fit for web scraping workflows. Each component does one thing well, and the pipeline graph handles data flow, error propagation, and serialization. Combined with the WebPerception API, you can build production-grade RAG systems that stay current with live web data.
Next steps: Check out our quickstart guide to get your API key, or explore our other framework integrations.