Web Scraping for Supply Chain & Logistics: How AI Agents Track Shipments, Inventory & Supplier Data in 2026
Supply chain visibility is a $4.2 billion market — and it exists because most companies can't see what's happening across their supply chain in real time. Shipments get delayed without warning. Supplier prices change without notice. Port congestion causes cascading disruptions that take weeks to untangle.
Enterprise visibility platforms like FourKites, project44, and Resilinc charge $5,000–$50,000/month to solve this problem. They aggregate carrier data, monitor disruptions, and provide dashboards. But they're expensive, rigid, and often don't cover niche suppliers or regional carriers.
What if you could build your own supply chain intelligence system using AI agents? One that scrapes carrier tracking pages, monitors supplier pricing, detects port congestion, and alerts you to disruptions — all for a fraction of the cost?
In this guide, you'll build exactly that using Python, the WebPerception API, and AI-powered analysis.
Why Supply Chains Need Web Scraping
Modern supply chains are fragmented across dozens of systems that don't talk to each other:
- Carrier portals: Each shipping line, freight forwarder, and trucking company has its own tracking portal with different formats
- Supplier websites: Pricing, lead times, and MOQs change constantly — often without notification
- Port authorities: Vessel schedules, congestion data, and berth availability are published on government websites
- Commodity exchanges: Raw material prices fluctuate daily on exchanges and trading platforms
- Regulatory sites: Tariff changes, trade restrictions, and customs requirements are buried in government portals
- News & weather: Natural disasters, strikes, and geopolitical events that disrupt supply chains
No single platform covers all of these. AI agents can bridge the gap by scraping data from any source and unifying it into a single intelligence layer.
Architecture: AI-Powered Supply Chain Intelligence Pipeline
Here's the complete system we'll build:
- Source Discovery — Identify carrier portals, supplier sites, port authorities, and commodity feeds to monitor
- AI Extraction — Scrape shipment status, pricing, inventory levels, and congestion data with structured extraction
- Storage & Tracking — SQLite database with historical data for trend analysis and anomaly detection
- Disruption Detection — Identify shipment delays, price spikes, low inventory, and port congestion automatically
- AI Analysis — LLM-powered risk assessment, impact analysis, and alternative supplier recommendations
- Alerts & Reports — Slack notifications for disruptions, daily supply chain briefings
Step 1: Define Your Supply Chain Data Models
First, structure the data you need to extract from supply chain sources:
from pydantic import BaseModel
from typing import Optional, List
from datetime import date, datetime
class ShipmentStatus(BaseModel):
"""Structured shipment tracking data from carrier portals."""
tracking_number: str
carrier: str
origin: str
destination: str
current_location: str
status: str # "in_transit", "at_port", "customs_hold", "delivered", "delayed"
estimated_arrival: Optional[date] = None
original_eta: Optional[date] = None
delay_days: int = 0
vessel_name: Optional[str] = None
container_id: Optional[str] = None
last_update: Optional[datetime] = None
milestones: List[str] = []
class SupplierPrice(BaseModel):
"""Structured supplier pricing and availability data."""
supplier_name: str
product_name: str
sku: str
unit_price: float
currency: str = "USD"
moq: int # minimum order quantity
lead_time_days: int
stock_status: str # "in_stock", "low_stock", "out_of_stock", "made_to_order"
quantity_available: Optional[int] = None
last_updated: Optional[date] = None
certifications: List[str] = [] # "ISO9001", "FDA", "CE", etc.
class InventoryLevel(BaseModel):
"""Warehouse inventory tracking data."""
warehouse_location: str
sku: str
product_name: str
quantity_on_hand: int
quantity_reserved: int
quantity_available: int
reorder_point: int
days_of_supply: float
last_receipt_date: Optional[date] = None
next_expected_receipt: Optional[date] = None
class PortCongestion(BaseModel):
"""Port congestion and vessel schedule data."""
port_name: str
port_code: str
vessels_at_anchor: int
average_wait_days: float
berth_utilization_pct: float
vessels_expected_7d: int
congestion_level: str # "low", "moderate", "high", "critical"
last_updated: Optional[datetime] = None
notable_delays: List[str] = []
Step 2: Scrape Supply Chain Data with AI Extraction
The WebPerception API handles complex carrier portals and supplier sites that require JavaScript rendering:
import requests
import json
from datetime import date
MANTIS_API_KEY = "your-api-key"
BASE_URL = "https://api.mantisapi.com"
def scrape_shipment_status(carrier: str, tracking_number: str) -> dict:
"""Scrape shipment status from a carrier tracking portal."""
# Map carriers to their tracking URLs
carrier_urls = {
"maersk": f"https://www.maersk.com/tracking/{tracking_number}",
"msc": f"https://www.msc.com/track-a-shipment?trackingNumber={tracking_number}",
"cosco": f"https://elines.coscoshipping.com/ebtracking/public/containers/{tracking_number}",
"hapag_lloyd": f"https://www.hapag-lloyd.com/en/online-business/track/track-by-booking-solution.html?blno={tracking_number}",
"ups": f"https://www.ups.com/track?tracknum={tracking_number}",
"fedex": f"https://www.fedex.com/fedextrack/?trknbr={tracking_number}",
}
url = carrier_urls.get(carrier.lower())
if not url:
return {"error": f"Unknown carrier: {carrier}"}
response = requests.post(
f"{BASE_URL}/extract",
headers={"Authorization": f"Bearer {MANTIS_API_KEY}"},
json={
"url": url,
"schema": {
"type": "object",
"properties": {
"tracking_number": {"type": "string"},
"current_location": {"type": "string"},
"status": {"type": "string"},
"estimated_arrival": {"type": "string"},
"vessel_name": {"type": "string"},
"container_id": {"type": "string"},
"milestones": {
"type": "array",
"items": {"type": "string"}
}
}
},
"wait_for": "networkidle",
"timeout": 30000
}
)
if response.ok:
data = response.json()
result = data.get("extracted", {})
result["carrier"] = carrier
result["tracking_number"] = tracking_number
return result
return {"error": response.text}
def scrape_supplier_prices(supplier_url: str, products: list) -> list:
"""Scrape current pricing and availability from a supplier catalog."""
response = requests.post(
f"{BASE_URL}/extract",
headers={"Authorization": f"Bearer {MANTIS_API_KEY}"},
json={
"url": supplier_url,
"schema": {
"type": "array",
"items": {
"type": "object",
"properties": {
"product_name": {"type": "string"},
"sku": {"type": "string"},
"unit_price": {"type": "number"},
"currency": {"type": "string"},
"moq": {"type": "integer"},
"lead_time_days": {"type": "integer"},
"stock_status": {"type": "string"},
"quantity_available": {"type": "integer"},
"certifications": {
"type": "array",
"items": {"type": "string"}
}
}
}
},
"wait_for": "networkidle",
"timeout": 30000
}
)
if response.ok:
return response.json().get("extracted", [])
return []
def scrape_port_congestion(port_url: str) -> dict:
"""Scrape port congestion data from port authority websites."""
response = requests.post(
f"{BASE_URL}/extract",
headers={"Authorization": f"Bearer {MANTIS_API_KEY}"},
json={
"url": port_url,
"schema": {
"type": "object",
"properties": {
"port_name": {"type": "string"},
"vessels_at_anchor": {"type": "integer"},
"average_wait_days": {"type": "number"},
"berth_utilization_pct": {"type": "number"},
"vessels_expected_7d": {"type": "integer"},
"notable_delays": {
"type": "array",
"items": {"type": "string"}
}
}
},
"wait_for": "networkidle",
"timeout": 30000
}
)
if response.ok:
return response.json().get("extracted", {})
return {}
Step 3: Store Supply Chain Data
Track everything in SQLite for historical analysis and trend detection:
import sqlite3
from datetime import datetime
def init_supply_chain_db(db_path: str = "supply_chain_intel.db"):
"""Initialize the supply chain intelligence database."""
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""CREATE TABLE IF NOT EXISTS shipments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
tracking_number TEXT,
carrier TEXT,
origin TEXT,
destination TEXT,
current_location TEXT,
status TEXT,
estimated_arrival DATE,
original_eta DATE,
delay_days INTEGER DEFAULT 0,
vessel_name TEXT,
container_id TEXT,
milestones TEXT
)""")
c.execute("""CREATE TABLE IF NOT EXISTS supplier_prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
supplier_name TEXT,
product_name TEXT,
sku TEXT,
unit_price REAL,
currency TEXT DEFAULT 'USD',
moq INTEGER,
lead_time_days INTEGER,
stock_status TEXT,
quantity_available INTEGER
)""")
c.execute("""CREATE TABLE IF NOT EXISTS port_congestion (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
port_name TEXT,
port_code TEXT,
vessels_at_anchor INTEGER,
average_wait_days REAL,
berth_utilization_pct REAL,
vessels_expected_7d INTEGER,
congestion_level TEXT
)""")
c.execute("""CREATE TABLE IF NOT EXISTS supply_chain_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
alert_type TEXT,
severity TEXT,
category TEXT,
description TEXT,
metadata TEXT
)""")
c.execute("CREATE INDEX IF NOT EXISTS idx_ship_tracking ON shipments(tracking_number)")
c.execute("CREATE INDEX IF NOT EXISTS idx_supplier_sku ON supplier_prices(sku, supplier_name)")
c.execute("CREATE INDEX IF NOT EXISTS idx_port_name ON port_congestion(port_name)")
conn.commit()
return conn
def store_shipment(conn, shipment: dict):
"""Store a shipment tracking record."""
conn.execute("""
INSERT INTO shipments (tracking_number, carrier, origin, destination,
current_location, status, estimated_arrival, original_eta,
delay_days, vessel_name, container_id, milestones)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
shipment.get("tracking_number"), shipment.get("carrier"),
shipment.get("origin"), shipment.get("destination"),
shipment.get("current_location"), shipment.get("status"),
shipment.get("estimated_arrival"), shipment.get("original_eta"),
shipment.get("delay_days", 0), shipment.get("vessel_name"),
shipment.get("container_id"),
json.dumps(shipment.get("milestones", []))
))
conn.commit()
def store_supplier_price(conn, price: dict):
"""Store a supplier price record."""
conn.execute("""
INSERT INTO supplier_prices (supplier_name, product_name, sku,
unit_price, currency, moq, lead_time_days, stock_status,
quantity_available)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
price.get("supplier_name"), price.get("product_name"),
price.get("sku"), price.get("unit_price"),
price.get("currency", "USD"), price.get("moq"),
price.get("lead_time_days"), price.get("stock_status"),
price.get("quantity_available")
))
conn.commit()
Step 4: Detect Supply Chain Disruptions
Automated anomaly detection catches problems before they cascade:
def detect_shipment_delays(conn) -> list:
"""Detect shipments that are delayed or at risk."""
alerts = []
# Find shipments where ETA has slipped
delayed = conn.execute("""
SELECT DISTINCT s1.tracking_number, s1.carrier, s1.destination,
s1.estimated_arrival, s1.status, s1.current_location
FROM shipments s1
INNER JOIN (
SELECT tracking_number, MAX(scraped_at) as latest
FROM shipments
GROUP BY tracking_number
) s2 ON s1.tracking_number = s2.tracking_number
AND s1.scraped_at = s2.latest
WHERE s1.status IN ('delayed', 'customs_hold')
OR s1.delay_days > 2
""").fetchall()
for tracking, carrier, dest, eta, status, location in delayed:
severity = "HIGH" if status == "customs_hold" else "MEDIUM"
alerts.append({
"type": "SHIPMENT_DELAY",
"severity": severity,
"category": "logistics",
"description": f"{carrier} {tracking} to {dest}: "
f"status={status}, currently at {location}, "
f"ETA: {eta}"
})
return alerts
def detect_price_spikes(conn) -> list:
"""Detect supplier price increases >15%."""
alerts = []
spikes = conn.execute("""
SELECT sp1.supplier_name, sp1.product_name, sp1.sku,
sp1.unit_price as current_price,
sp2.avg_price as previous_avg,
((sp1.unit_price - sp2.avg_price) / sp2.avg_price * 100) as change_pct
FROM supplier_prices sp1
INNER JOIN (
SELECT sku, supplier_name, AVG(unit_price) as avg_price
FROM supplier_prices
WHERE scraped_at < datetime('now', '-24 hours')
AND scraped_at > datetime('now', '-14 days')
GROUP BY sku, supplier_name
) sp2 ON sp1.sku = sp2.sku AND sp1.supplier_name = sp2.supplier_name
WHERE sp1.scraped_at > datetime('now', '-4 hours')
AND sp1.unit_price > sp2.avg_price * 1.15
""").fetchall()
for supplier, product, sku, current, previous, change in spikes:
alerts.append({
"type": "PRICE_SPIKE",
"severity": "HIGH" if change > 25 else "MEDIUM",
"category": "procurement",
"description": f"{supplier} — {product} ({sku}): "
f"${previous:.2f} → ${current:.2f} ({change:+.1f}%)"
})
return alerts
def detect_inventory_risks(conn) -> list:
"""Detect low inventory and stockout risks."""
alerts = []
low_stock = conn.execute("""
SELECT supplier_name, product_name, sku, stock_status,
quantity_available, lead_time_days
FROM supplier_prices
WHERE scraped_at > datetime('now', '-4 hours')
AND stock_status IN ('low_stock', 'out_of_stock')
""").fetchall()
for supplier, product, sku, status, qty, lead_time in low_stock:
severity = "HIGH" if status == "out_of_stock" else "MEDIUM"
alerts.append({
"type": "INVENTORY_RISK",
"severity": severity,
"category": "procurement",
"description": f"{supplier} — {product} ({sku}): "
f"{status} (qty: {qty or 'N/A'}, "
f"lead time: {lead_time}d)"
})
return alerts
def detect_port_congestion_alerts(conn) -> list:
"""Detect critical port congestion levels."""
alerts = []
congested = conn.execute("""
SELECT port_name, port_code, vessels_at_anchor,
average_wait_days, berth_utilization_pct
FROM port_congestion
WHERE scraped_at > datetime('now', '-6 hours')
AND (average_wait_days > 3 OR berth_utilization_pct > 90)
""").fetchall()
for port, code, vessels, wait, util in congested:
severity = "HIGH" if wait > 7 or util > 95 else "MEDIUM"
alerts.append({
"type": "PORT_CONGESTION",
"severity": severity,
"category": "logistics",
"description": f"{port} ({code}): {vessels} vessels at anchor, "
f"{wait:.1f}d avg wait, {util:.0f}% berth utilization"
})
return alerts
Step 5: AI-Powered Supply Chain Risk Analysis
Use GPT-4o to interpret disruptions and recommend mitigation strategies:
from openai import OpenAI
client = OpenAI()
def analyze_supply_chain_risks(conn) -> dict:
"""Generate AI-powered supply chain risk assessment."""
# Gather recent alerts
alerts = conn.execute("""
SELECT alert_type, severity, category, description
FROM supply_chain_alerts
WHERE created_at > datetime('now', '-48 hours')
ORDER BY created_at DESC
LIMIT 30
""").fetchall()
# Get delayed shipments
delayed = conn.execute("""
SELECT tracking_number, carrier, destination, status,
estimated_arrival, delay_days
FROM shipments
WHERE scraped_at > datetime('now', '-6 hours')
AND (status = 'delayed' OR delay_days > 0)
ORDER BY delay_days DESC
LIMIT 20
""").fetchall()
# Get price trends
price_trends = conn.execute("""
SELECT supplier_name, sku,
AVG(CASE WHEN scraped_at > datetime('now', '-24 hours')
THEN unit_price END) as current_avg,
AVG(CASE WHEN scraped_at BETWEEN datetime('now', '-7 days')
AND datetime('now', '-24 hours')
THEN unit_price END) as week_avg
FROM supplier_prices
GROUP BY supplier_name, sku
HAVING current_avg IS NOT NULL AND week_avg IS NOT NULL
""").fetchall()
# Get port status
ports = conn.execute("""
SELECT port_name, vessels_at_anchor, average_wait_days,
berth_utilization_pct, congestion_level
FROM port_congestion
WHERE scraped_at > datetime('now', '-12 hours')
""").fetchall()
prompt = f"""Analyze this supply chain data and provide a comprehensive risk assessment.
ACTIVE ALERTS ({len(alerts)}):
{chr(10).join(f' [{a[1]}] {a[0]} ({a[2]}): {a[3]}' for a in alerts[:20]) if alerts else ' None'}
DELAYED SHIPMENTS ({len(delayed)}):
{chr(10).join(f' {d[1]} {d[0]} → {d[2]}: {d[3]}, ETA {d[4]}, +{d[5]}d late' for d in delayed[:15]) if delayed else ' None'}
SUPPLIER PRICE TRENDS:
{chr(10).join(f' {p[0]} {p[1]}: ${p[2]:.2f} now vs ${p[3]:.2f} last week ({((p[2]-p[3])/p[3]*100):+.1f}%)' for p in price_trends[:15]) if price_trends else ' No significant changes'}
PORT STATUS:
{chr(10).join(f' {p[0]}: {p[1]} at anchor, {p[2]:.1f}d wait, {p[3]:.0f}% util — {p[4]}' for p in ports) if ports else ' All clear'}
Provide:
1. RISK_SUMMARY — Overall supply chain risk level (LOW/MEDIUM/HIGH/CRITICAL) with explanation
2. CRITICAL_ISSUES — Top 3 issues requiring immediate attention
3. DELAYED_IMPACT — How current delays affect downstream operations
4. PRICE_RISKS — Procurement cost trends and recommendations
5. PORT_OUTLOOK — Logistics bottleneck forecast
6. MITIGATION — Specific actions to reduce risk (alternative suppliers, routing changes, buffer stock)
7. 7DAY_FORECAST — Expected supply chain conditions next week
Format as structured JSON with these keys."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a supply chain risk analyst. "
"Provide data-driven risk assessments with specific, actionable "
"recommendations. Quantify impact in dollars and days when possible."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.3
)
return json.loads(response.choices[0].message.content)
Step 6: Automated Alerts & Daily Briefings
Send Slack notifications for disruptions and daily supply chain briefings:
import requests as req
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
def send_supply_chain_alert(alert: dict):
"""Send a supply chain alert to Slack."""
severity_emoji = {"HIGH": "🚨", "MEDIUM": "⚠️", "LOW": "ℹ️"}
type_emoji = {
"SHIPMENT_DELAY": "🚢",
"PRICE_SPIKE": "📈",
"INVENTORY_RISK": "📦",
"PORT_CONGESTION": "⚓"
}
emoji = type_emoji.get(alert["type"], "📊")
sev = severity_emoji.get(alert["severity"], "ℹ️")
message = f"{sev} {emoji} *{alert['type']}*\n{alert['description']}"
req.post(SLACK_WEBHOOK, json={"text": message})
def generate_daily_briefing(conn):
"""Generate and send a daily supply chain briefing."""
# Count active shipments by status
statuses = conn.execute("""
SELECT status, COUNT(DISTINCT tracking_number)
FROM shipments
WHERE scraped_at > datetime('now', '-24 hours')
GROUP BY status
""").fetchall()
# Count alerts by severity
alert_counts = conn.execute("""
SELECT severity, COUNT(*)
FROM supply_chain_alerts
WHERE created_at > datetime('now', '-24 hours')
GROUP BY severity
""").fetchall()
# Supplier price summary
price_changes = conn.execute("""
SELECT COUNT(*) FROM supply_chain_alerts
WHERE alert_type = 'PRICE_SPIKE'
AND created_at > datetime('now', '-24 hours')
""").fetchone()[0]
status_lines = [f" {s}: {c}" for s, c in statuses]
alert_lines = [f" {s}: {c}" for s, c in alert_counts]
briefing = (
f"📊 *Daily Supply Chain Briefing — {date.today()}*\n\n"
f"*Shipment Status:*\n{''.join(chr(10) + s for s in status_lines)}\n\n"
f"*Alerts (24h):*\n{''.join(chr(10) + a for a in alert_lines)}\n\n"
f"*Price Alerts:* {price_changes} supplier price spikes detected\n"
)
# Run AI risk analysis
risk = analyze_supply_chain_risks(conn)
if risk:
briefing += (
f"\n*AI Risk Assessment:* {risk.get('RISK_SUMMARY', 'N/A')}\n"
f"*Top Issue:* {risk.get('CRITICAL_ISSUES', ['None'])[0] if isinstance(risk.get('CRITICAL_ISSUES'), list) else risk.get('CRITICAL_ISSUES', 'None')}\n"
)
req.post(SLACK_WEBHOOK, json={"text": briefing})
Step 7: Putting It All Together — Supply Chain Monitoring Agent
import schedule
import time
from datetime import date
# Configuration
TRACKED_SHIPMENTS = [
{"carrier": "maersk", "tracking": "MAEU1234567"},
{"carrier": "msc", "tracking": "MSCU7654321"},
{"carrier": "ups", "tracking": "1Z999AA10123456784"},
]
SUPPLIER_CATALOGS = [
{"name": "Shenzhen Electronics Co", "url": "https://supplier-a.com/catalog"},
{"name": "Taiwan Semiconductor", "url": "https://supplier-b.com/products"},
{"name": "Vietnam Packaging Ltd", "url": "https://supplier-c.com/pricing"},
]
PORT_MONITORS = [
{"name": "Port of Los Angeles", "url": "https://www.portoflosangeles.org/"},
{"name": "Port of Shanghai", "url": "https://www.portshanghai.com.cn/"},
{"name": "Port of Rotterdam", "url": "https://www.portofrotterdam.com/"},
]
def run_shipment_monitoring():
"""Track all active shipments."""
conn = init_supply_chain_db()
for shipment in TRACKED_SHIPMENTS:
result = scrape_shipment_status(shipment["carrier"], shipment["tracking"])
if "error" not in result:
store_shipment(conn, result)
alerts = detect_shipment_delays(conn)
for alert in alerts:
send_supply_chain_alert(alert)
conn.execute("""
INSERT INTO supply_chain_alerts (alert_type, severity, category, description)
VALUES (?, ?, ?, ?)
""", (alert["type"], alert["severity"], alert["category"], alert["description"]))
conn.commit()
print(f"[{datetime.now()}] Shipment monitoring complete — "
f"{len(TRACKED_SHIPMENTS)} shipments tracked")
conn.close()
def run_supplier_monitoring():
"""Monitor supplier pricing and availability."""
conn = init_supply_chain_db()
for supplier in SUPPLIER_CATALOGS:
prices = scrape_supplier_prices(supplier["url"], [])
for price in prices:
price["supplier_name"] = supplier["name"]
store_supplier_price(conn, price)
# Check for price spikes and inventory risks
price_alerts = detect_price_spikes(conn)
inventory_alerts = detect_inventory_risks(conn)
for alert in price_alerts + inventory_alerts:
send_supply_chain_alert(alert)
conn.execute("""
INSERT INTO supply_chain_alerts (alert_type, severity, category, description)
VALUES (?, ?, ?, ?)
""", (alert["type"], alert["severity"], alert["category"], alert["description"]))
conn.commit()
print(f"[{datetime.now()}] Supplier monitoring complete — "
f"{len(SUPPLIER_CATALOGS)} suppliers checked")
conn.close()
def run_port_monitoring():
"""Monitor port congestion levels."""
conn = init_supply_chain_db()
for port in PORT_MONITORS:
data = scrape_port_congestion(port["url"])
if data:
data["port_name"] = port["name"]
conn.execute("""
INSERT INTO port_congestion (port_name, vessels_at_anchor,
average_wait_days, berth_utilization_pct, vessels_expected_7d,
congestion_level)
VALUES (?, ?, ?, ?, ?, ?)
""", (data.get("port_name"), data.get("vessels_at_anchor", 0),
data.get("average_wait_days", 0), data.get("berth_utilization_pct", 0),
data.get("vessels_expected_7d", 0),
"high" if data.get("average_wait_days", 0) > 3 else "moderate"
if data.get("average_wait_days", 0) > 1 else "low"))
conn.commit()
alerts = detect_port_congestion_alerts(conn)
for alert in alerts:
send_supply_chain_alert(alert)
print(f"[{datetime.now()}] Port monitoring complete — "
f"{len(PORT_MONITORS)} ports checked")
conn.close()
# Schedule monitoring
schedule.every(2).hours.do(run_shipment_monitoring)
schedule.every(6).hours.do(run_supplier_monitoring)
schedule.every(12).hours.do(run_port_monitoring)
schedule.every().day.at("07:00").do(
lambda: generate_daily_briefing(init_supply_chain_db())
)
if __name__ == "__main__":
print("🚢📦 Supply Chain Intelligence Agent — Starting...")
run_shipment_monitoring()
run_supplier_monitoring()
run_port_monitoring()
generate_daily_briefing(init_supply_chain_db())
while True:
schedule.run_pending()
time.sleep(60)
Cost Comparison: Traditional vs. AI Agent Approach
| Solution | Monthly Cost | Coverage | Customization |
|---|---|---|---|
| FourKites | $5,000–$20,000 | Carrier tracking | Limited |
| project44 | $3,000–$15,000 | Visibility platform | Moderate |
| Resilinc | $10,000–$50,000 | Risk monitoring | Moderate |
| Coupa Supply Chain | $8,000–$30,000 | Procurement + risk | Enterprise |
| AI Agent + Mantis API | $29–$299 | All sources | Fully custom |
Use Cases by Industry Segment
1. Manufacturers
Manufacturers managing hundreds of SKUs across dozens of suppliers use this pipeline to detect component shortages before they halt production. Monitor supplier lead times, track inbound shipments, and get early warning when a critical part goes out of stock — with AI-recommended alternative suppliers.
2. E-Commerce & D2C Brands
Online retailers that source products from overseas need visibility into container shipments, customs delays, and warehouse inventory levels. An AI agent monitors the entire journey from factory floor to fulfillment center and predicts when stock will run out based on current sales velocity.
3. Third-Party Logistics (3PLs)
3PLs managing shipments for multiple clients need multi-carrier visibility without buying enterprise subscriptions for each carrier. AI agents scrape tracking data from any carrier portal and provide a unified view across all shipments, with automated exception alerting.
4. Procurement Teams
Procurement professionals monitoring commodity prices, supplier availability, and contract compliance can automate the manual work of checking supplier catalogs and market prices. The AI analysis layer flags when it's time to renegotiate, switch suppliers, or build buffer stock.
Ethical Considerations & Best Practices
- Respect robots.txt: Check carrier and supplier site policies before scraping — many offer APIs for authorized partners
- Rate limiting: Carrier portals can be slow — use 15+ second intervals between requests
- Use official APIs when available: Major carriers (Maersk, UPS, FedEx) offer tracking APIs — prefer these for production use
- Data sensitivity: Supply chain data can reveal business strategy — secure your database and alerts
- Accuracy validation: Cross-reference AI-extracted data with known shipment details before acting on alerts
- Compliance: Some port and customs data has access restrictions — verify legal access rights
Deployment Options
| Method | Best For | Cost |
|---|---|---|
| Cron job (VPS) | Simple scheduled monitoring | $5–$20/mo |
| AWS Lambda + EventBridge | Serverless, auto-scaling | $2–$30/mo |
| GitHub Actions | Free tier for light monitoring | Free–$10/mo |
| Docker + Kubernetes | Enterprise multi-source monitoring | $50+/mo |
Start Building Your Supply Chain Intelligence Agent
The WebPerception API handles JavaScript-heavy carrier portals, AI-powered data extraction, and structured output — so you can focus on building the intelligence layer that keeps your supply chain running smoothly.
Get Your API Key →What's Next
Once your supply chain intelligence agent is running, consider these enhancements:
- Weather integration: Monitor severe weather along shipping routes to predict delays
- Geopolitical monitoring: Track trade policy changes, sanctions, and tariff updates that affect your supply chain
- Demand forecasting: Combine supply data with sales forecasts to optimize inventory levels
- Supplier scoring: Build automated reliability scores based on historical on-time delivery and price stability
- Carbon tracking: Monitor and optimize supply chain emissions by tracking routing and mode choices
Related Articles
- The Complete Guide to Web Scraping with AI Agents
- Web Scraping for Price Monitoring: Build an AI-Powered Price Tracker
- Web Scraping for E-Commerce: Monitor Products, Prices & Reviews
- Web Scraping for Market Research: Analyze Competitors & Trends
- AI Agent Structured Data Extraction
- Build a Website Monitoring Agent