Web Scraping for Supply Chain & Logistics: How AI Agents Track Shipments, Inventory & Supplier Data in 2026

Published March 11, 2026 · 16 min read

Supply chain visibility is a $4.2 billion market — and it exists because most companies can't see what's happening across their supply chain in real time. Shipments get delayed without warning. Supplier prices change without notice. Port congestion causes cascading disruptions that take weeks to untangle.

Enterprise visibility platforms like FourKites, project44, and Resilinc charge $5,000–$50,000/month to solve this problem. They aggregate carrier data, monitor disruptions, and provide dashboards. But they're expensive, rigid, and often don't cover niche suppliers or regional carriers.

What if you could build your own supply chain intelligence system using AI agents? One that scrapes carrier tracking pages, monitors supplier pricing, detects port congestion, and alerts you to disruptions — all for a fraction of the cost?

In this guide, you'll build exactly that using Python, the WebPerception API, and AI-powered analysis.

Why Supply Chains Need Web Scraping

Modern supply chains are fragmented across dozens of systems that don't talk to each other:

No single platform covers all of these. AI agents can bridge the gap by scraping data from any source and unifying it into a single intelligence layer.

Architecture: AI-Powered Supply Chain Intelligence Pipeline

Here's the complete system we'll build:

  1. Source Discovery — Identify carrier portals, supplier sites, port authorities, and commodity feeds to monitor
  2. AI Extraction — Scrape shipment status, pricing, inventory levels, and congestion data with structured extraction
  3. Storage & Tracking — SQLite database with historical data for trend analysis and anomaly detection
  4. Disruption Detection — Identify shipment delays, price spikes, low inventory, and port congestion automatically
  5. AI Analysis — LLM-powered risk assessment, impact analysis, and alternative supplier recommendations
  6. Alerts & Reports — Slack notifications for disruptions, daily supply chain briefings

Step 1: Define Your Supply Chain Data Models

First, structure the data you need to extract from supply chain sources:

from pydantic import BaseModel
from typing import Optional, List
from datetime import date, datetime

class ShipmentStatus(BaseModel):
    """Structured shipment tracking data from carrier portals."""
    tracking_number: str
    carrier: str
    origin: str
    destination: str
    current_location: str
    status: str  # "in_transit", "at_port", "customs_hold", "delivered", "delayed"
    estimated_arrival: Optional[date] = None
    original_eta: Optional[date] = None
    delay_days: int = 0
    vessel_name: Optional[str] = None
    container_id: Optional[str] = None
    last_update: Optional[datetime] = None
    milestones: List[str] = []

class SupplierPrice(BaseModel):
    """Structured supplier pricing and availability data."""
    supplier_name: str
    product_name: str
    sku: str
    unit_price: float
    currency: str = "USD"
    moq: int  # minimum order quantity
    lead_time_days: int
    stock_status: str  # "in_stock", "low_stock", "out_of_stock", "made_to_order"
    quantity_available: Optional[int] = None
    last_updated: Optional[date] = None
    certifications: List[str] = []  # "ISO9001", "FDA", "CE", etc.

class InventoryLevel(BaseModel):
    """Warehouse inventory tracking data."""
    warehouse_location: str
    sku: str
    product_name: str
    quantity_on_hand: int
    quantity_reserved: int
    quantity_available: int
    reorder_point: int
    days_of_supply: float
    last_receipt_date: Optional[date] = None
    next_expected_receipt: Optional[date] = None

class PortCongestion(BaseModel):
    """Port congestion and vessel schedule data."""
    port_name: str
    port_code: str
    vessels_at_anchor: int
    average_wait_days: float
    berth_utilization_pct: float
    vessels_expected_7d: int
    congestion_level: str  # "low", "moderate", "high", "critical"
    last_updated: Optional[datetime] = None
    notable_delays: List[str] = []

Step 2: Scrape Supply Chain Data with AI Extraction

The WebPerception API handles complex carrier portals and supplier sites that require JavaScript rendering:

import requests
import json
from datetime import date

MANTIS_API_KEY = "your-api-key"
BASE_URL = "https://api.mantisapi.com"

def scrape_shipment_status(carrier: str, tracking_number: str) -> dict:
    """Scrape shipment status from a carrier tracking portal."""

    # Map carriers to their tracking URLs
    carrier_urls = {
        "maersk": f"https://www.maersk.com/tracking/{tracking_number}",
        "msc": f"https://www.msc.com/track-a-shipment?trackingNumber={tracking_number}",
        "cosco": f"https://elines.coscoshipping.com/ebtracking/public/containers/{tracking_number}",
        "hapag_lloyd": f"https://www.hapag-lloyd.com/en/online-business/track/track-by-booking-solution.html?blno={tracking_number}",
        "ups": f"https://www.ups.com/track?tracknum={tracking_number}",
        "fedex": f"https://www.fedex.com/fedextrack/?trknbr={tracking_number}",
    }

    url = carrier_urls.get(carrier.lower())
    if not url:
        return {"error": f"Unknown carrier: {carrier}"}

    response = requests.post(
        f"{BASE_URL}/extract",
        headers={"Authorization": f"Bearer {MANTIS_API_KEY}"},
        json={
            "url": url,
            "schema": {
                "type": "object",
                "properties": {
                    "tracking_number": {"type": "string"},
                    "current_location": {"type": "string"},
                    "status": {"type": "string"},
                    "estimated_arrival": {"type": "string"},
                    "vessel_name": {"type": "string"},
                    "container_id": {"type": "string"},
                    "milestones": {
                        "type": "array",
                        "items": {"type": "string"}
                    }
                }
            },
            "wait_for": "networkidle",
            "timeout": 30000
        }
    )

    if response.ok:
        data = response.json()
        result = data.get("extracted", {})
        result["carrier"] = carrier
        result["tracking_number"] = tracking_number
        return result

    return {"error": response.text}


def scrape_supplier_prices(supplier_url: str, products: list) -> list:
    """Scrape current pricing and availability from a supplier catalog."""

    response = requests.post(
        f"{BASE_URL}/extract",
        headers={"Authorization": f"Bearer {MANTIS_API_KEY}"},
        json={
            "url": supplier_url,
            "schema": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "product_name": {"type": "string"},
                        "sku": {"type": "string"},
                        "unit_price": {"type": "number"},
                        "currency": {"type": "string"},
                        "moq": {"type": "integer"},
                        "lead_time_days": {"type": "integer"},
                        "stock_status": {"type": "string"},
                        "quantity_available": {"type": "integer"},
                        "certifications": {
                            "type": "array",
                            "items": {"type": "string"}
                        }
                    }
                }
            },
            "wait_for": "networkidle",
            "timeout": 30000
        }
    )

    if response.ok:
        return response.json().get("extracted", [])
    return []


def scrape_port_congestion(port_url: str) -> dict:
    """Scrape port congestion data from port authority websites."""

    response = requests.post(
        f"{BASE_URL}/extract",
        headers={"Authorization": f"Bearer {MANTIS_API_KEY}"},
        json={
            "url": port_url,
            "schema": {
                "type": "object",
                "properties": {
                    "port_name": {"type": "string"},
                    "vessels_at_anchor": {"type": "integer"},
                    "average_wait_days": {"type": "number"},
                    "berth_utilization_pct": {"type": "number"},
                    "vessels_expected_7d": {"type": "integer"},
                    "notable_delays": {
                        "type": "array",
                        "items": {"type": "string"}
                    }
                }
            },
            "wait_for": "networkidle",
            "timeout": 30000
        }
    )

    if response.ok:
        return response.json().get("extracted", {})
    return {}

Step 3: Store Supply Chain Data

Track everything in SQLite for historical analysis and trend detection:

import sqlite3
from datetime import datetime

def init_supply_chain_db(db_path: str = "supply_chain_intel.db"):
    """Initialize the supply chain intelligence database."""
    conn = sqlite3.connect(db_path)
    c = conn.cursor()

    c.execute("""CREATE TABLE IF NOT EXISTS shipments (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        tracking_number TEXT,
        carrier TEXT,
        origin TEXT,
        destination TEXT,
        current_location TEXT,
        status TEXT,
        estimated_arrival DATE,
        original_eta DATE,
        delay_days INTEGER DEFAULT 0,
        vessel_name TEXT,
        container_id TEXT,
        milestones TEXT
    )""")

    c.execute("""CREATE TABLE IF NOT EXISTS supplier_prices (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        supplier_name TEXT,
        product_name TEXT,
        sku TEXT,
        unit_price REAL,
        currency TEXT DEFAULT 'USD',
        moq INTEGER,
        lead_time_days INTEGER,
        stock_status TEXT,
        quantity_available INTEGER
    )""")

    c.execute("""CREATE TABLE IF NOT EXISTS port_congestion (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        port_name TEXT,
        port_code TEXT,
        vessels_at_anchor INTEGER,
        average_wait_days REAL,
        berth_utilization_pct REAL,
        vessels_expected_7d INTEGER,
        congestion_level TEXT
    )""")

    c.execute("""CREATE TABLE IF NOT EXISTS supply_chain_alerts (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        alert_type TEXT,
        severity TEXT,
        category TEXT,
        description TEXT,
        metadata TEXT
    )""")

    c.execute("CREATE INDEX IF NOT EXISTS idx_ship_tracking ON shipments(tracking_number)")
    c.execute("CREATE INDEX IF NOT EXISTS idx_supplier_sku ON supplier_prices(sku, supplier_name)")
    c.execute("CREATE INDEX IF NOT EXISTS idx_port_name ON port_congestion(port_name)")

    conn.commit()
    return conn


def store_shipment(conn, shipment: dict):
    """Store a shipment tracking record."""
    conn.execute("""
        INSERT INTO shipments (tracking_number, carrier, origin, destination,
            current_location, status, estimated_arrival, original_eta,
            delay_days, vessel_name, container_id, milestones)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        shipment.get("tracking_number"), shipment.get("carrier"),
        shipment.get("origin"), shipment.get("destination"),
        shipment.get("current_location"), shipment.get("status"),
        shipment.get("estimated_arrival"), shipment.get("original_eta"),
        shipment.get("delay_days", 0), shipment.get("vessel_name"),
        shipment.get("container_id"),
        json.dumps(shipment.get("milestones", []))
    ))
    conn.commit()


def store_supplier_price(conn, price: dict):
    """Store a supplier price record."""
    conn.execute("""
        INSERT INTO supplier_prices (supplier_name, product_name, sku,
            unit_price, currency, moq, lead_time_days, stock_status,
            quantity_available)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        price.get("supplier_name"), price.get("product_name"),
        price.get("sku"), price.get("unit_price"),
        price.get("currency", "USD"), price.get("moq"),
        price.get("lead_time_days"), price.get("stock_status"),
        price.get("quantity_available")
    ))
    conn.commit()

Step 4: Detect Supply Chain Disruptions

Automated anomaly detection catches problems before they cascade:

def detect_shipment_delays(conn) -> list:
    """Detect shipments that are delayed or at risk."""
    alerts = []

    # Find shipments where ETA has slipped
    delayed = conn.execute("""
        SELECT DISTINCT s1.tracking_number, s1.carrier, s1.destination,
               s1.estimated_arrival, s1.status, s1.current_location
        FROM shipments s1
        INNER JOIN (
            SELECT tracking_number, MAX(scraped_at) as latest
            FROM shipments
            GROUP BY tracking_number
        ) s2 ON s1.tracking_number = s2.tracking_number
            AND s1.scraped_at = s2.latest
        WHERE s1.status IN ('delayed', 'customs_hold')
           OR s1.delay_days > 2
    """).fetchall()

    for tracking, carrier, dest, eta, status, location in delayed:
        severity = "HIGH" if status == "customs_hold" else "MEDIUM"
        alerts.append({
            "type": "SHIPMENT_DELAY",
            "severity": severity,
            "category": "logistics",
            "description": f"{carrier} {tracking} to {dest}: "
                          f"status={status}, currently at {location}, "
                          f"ETA: {eta}"
        })

    return alerts


def detect_price_spikes(conn) -> list:
    """Detect supplier price increases >15%."""
    alerts = []

    spikes = conn.execute("""
        SELECT sp1.supplier_name, sp1.product_name, sp1.sku,
               sp1.unit_price as current_price,
               sp2.avg_price as previous_avg,
               ((sp1.unit_price - sp2.avg_price) / sp2.avg_price * 100) as change_pct
        FROM supplier_prices sp1
        INNER JOIN (
            SELECT sku, supplier_name, AVG(unit_price) as avg_price
            FROM supplier_prices
            WHERE scraped_at < datetime('now', '-24 hours')
              AND scraped_at > datetime('now', '-14 days')
            GROUP BY sku, supplier_name
        ) sp2 ON sp1.sku = sp2.sku AND sp1.supplier_name = sp2.supplier_name
        WHERE sp1.scraped_at > datetime('now', '-4 hours')
          AND sp1.unit_price > sp2.avg_price * 1.15
    """).fetchall()

    for supplier, product, sku, current, previous, change in spikes:
        alerts.append({
            "type": "PRICE_SPIKE",
            "severity": "HIGH" if change > 25 else "MEDIUM",
            "category": "procurement",
            "description": f"{supplier} — {product} ({sku}): "
                          f"${previous:.2f} → ${current:.2f} ({change:+.1f}%)"
        })

    return alerts


def detect_inventory_risks(conn) -> list:
    """Detect low inventory and stockout risks."""
    alerts = []

    low_stock = conn.execute("""
        SELECT supplier_name, product_name, sku, stock_status,
               quantity_available, lead_time_days
        FROM supplier_prices
        WHERE scraped_at > datetime('now', '-4 hours')
          AND stock_status IN ('low_stock', 'out_of_stock')
    """).fetchall()

    for supplier, product, sku, status, qty, lead_time in low_stock:
        severity = "HIGH" if status == "out_of_stock" else "MEDIUM"
        alerts.append({
            "type": "INVENTORY_RISK",
            "severity": severity,
            "category": "procurement",
            "description": f"{supplier} — {product} ({sku}): "
                          f"{status} (qty: {qty or 'N/A'}, "
                          f"lead time: {lead_time}d)"
        })

    return alerts


def detect_port_congestion_alerts(conn) -> list:
    """Detect critical port congestion levels."""
    alerts = []

    congested = conn.execute("""
        SELECT port_name, port_code, vessels_at_anchor,
               average_wait_days, berth_utilization_pct
        FROM port_congestion
        WHERE scraped_at > datetime('now', '-6 hours')
          AND (average_wait_days > 3 OR berth_utilization_pct > 90)
    """).fetchall()

    for port, code, vessels, wait, util in congested:
        severity = "HIGH" if wait > 7 or util > 95 else "MEDIUM"
        alerts.append({
            "type": "PORT_CONGESTION",
            "severity": severity,
            "category": "logistics",
            "description": f"{port} ({code}): {vessels} vessels at anchor, "
                          f"{wait:.1f}d avg wait, {util:.0f}% berth utilization"
        })

    return alerts

Step 5: AI-Powered Supply Chain Risk Analysis

Use GPT-4o to interpret disruptions and recommend mitigation strategies:

from openai import OpenAI

client = OpenAI()

def analyze_supply_chain_risks(conn) -> dict:
    """Generate AI-powered supply chain risk assessment."""

    # Gather recent alerts
    alerts = conn.execute("""
        SELECT alert_type, severity, category, description
        FROM supply_chain_alerts
        WHERE created_at > datetime('now', '-48 hours')
        ORDER BY created_at DESC
        LIMIT 30
    """).fetchall()

    # Get delayed shipments
    delayed = conn.execute("""
        SELECT tracking_number, carrier, destination, status,
               estimated_arrival, delay_days
        FROM shipments
        WHERE scraped_at > datetime('now', '-6 hours')
          AND (status = 'delayed' OR delay_days > 0)
        ORDER BY delay_days DESC
        LIMIT 20
    """).fetchall()

    # Get price trends
    price_trends = conn.execute("""
        SELECT supplier_name, sku,
               AVG(CASE WHEN scraped_at > datetime('now', '-24 hours')
                   THEN unit_price END) as current_avg,
               AVG(CASE WHEN scraped_at BETWEEN datetime('now', '-7 days')
                   AND datetime('now', '-24 hours')
                   THEN unit_price END) as week_avg
        FROM supplier_prices
        GROUP BY supplier_name, sku
        HAVING current_avg IS NOT NULL AND week_avg IS NOT NULL
    """).fetchall()

    # Get port status
    ports = conn.execute("""
        SELECT port_name, vessels_at_anchor, average_wait_days,
               berth_utilization_pct, congestion_level
        FROM port_congestion
        WHERE scraped_at > datetime('now', '-12 hours')
    """).fetchall()

    prompt = f"""Analyze this supply chain data and provide a comprehensive risk assessment.

ACTIVE ALERTS ({len(alerts)}):
{chr(10).join(f'  [{a[1]}] {a[0]} ({a[2]}): {a[3]}' for a in alerts[:20]) if alerts else '  None'}

DELAYED SHIPMENTS ({len(delayed)}):
{chr(10).join(f'  {d[1]} {d[0]} → {d[2]}: {d[3]}, ETA {d[4]}, +{d[5]}d late' for d in delayed[:15]) if delayed else '  None'}

SUPPLIER PRICE TRENDS:
{chr(10).join(f'  {p[0]} {p[1]}: ${p[2]:.2f} now vs ${p[3]:.2f} last week ({((p[2]-p[3])/p[3]*100):+.1f}%)' for p in price_trends[:15]) if price_trends else '  No significant changes'}

PORT STATUS:
{chr(10).join(f'  {p[0]}: {p[1]} at anchor, {p[2]:.1f}d wait, {p[3]:.0f}% util — {p[4]}' for p in ports) if ports else '  All clear'}

Provide:
1. RISK_SUMMARY — Overall supply chain risk level (LOW/MEDIUM/HIGH/CRITICAL) with explanation
2. CRITICAL_ISSUES — Top 3 issues requiring immediate attention
3. DELAYED_IMPACT — How current delays affect downstream operations
4. PRICE_RISKS — Procurement cost trends and recommendations
5. PORT_OUTLOOK — Logistics bottleneck forecast
6. MITIGATION — Specific actions to reduce risk (alternative suppliers, routing changes, buffer stock)
7. 7DAY_FORECAST — Expected supply chain conditions next week

Format as structured JSON with these keys."""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a supply chain risk analyst. "
             "Provide data-driven risk assessments with specific, actionable "
             "recommendations. Quantify impact in dollars and days when possible."},
            {"role": "user", "content": prompt}
        ],
        response_format={"type": "json_object"},
        temperature=0.3
    )

    return json.loads(response.choices[0].message.content)

Step 6: Automated Alerts & Daily Briefings

Send Slack notifications for disruptions and daily supply chain briefings:

import requests as req

SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

def send_supply_chain_alert(alert: dict):
    """Send a supply chain alert to Slack."""
    severity_emoji = {"HIGH": "🚨", "MEDIUM": "⚠️", "LOW": "ℹ️"}
    type_emoji = {
        "SHIPMENT_DELAY": "🚢",
        "PRICE_SPIKE": "📈",
        "INVENTORY_RISK": "📦",
        "PORT_CONGESTION": "⚓"
    }

    emoji = type_emoji.get(alert["type"], "📊")
    sev = severity_emoji.get(alert["severity"], "ℹ️")

    message = f"{sev} {emoji} *{alert['type']}*\n{alert['description']}"
    req.post(SLACK_WEBHOOK, json={"text": message})


def generate_daily_briefing(conn):
    """Generate and send a daily supply chain briefing."""
    # Count active shipments by status
    statuses = conn.execute("""
        SELECT status, COUNT(DISTINCT tracking_number)
        FROM shipments
        WHERE scraped_at > datetime('now', '-24 hours')
        GROUP BY status
    """).fetchall()

    # Count alerts by severity
    alert_counts = conn.execute("""
        SELECT severity, COUNT(*)
        FROM supply_chain_alerts
        WHERE created_at > datetime('now', '-24 hours')
        GROUP BY severity
    """).fetchall()

    # Supplier price summary
    price_changes = conn.execute("""
        SELECT COUNT(*) FROM supply_chain_alerts
        WHERE alert_type = 'PRICE_SPIKE'
        AND created_at > datetime('now', '-24 hours')
    """).fetchone()[0]

    status_lines = [f"  {s}: {c}" for s, c in statuses]
    alert_lines = [f"  {s}: {c}" for s, c in alert_counts]

    briefing = (
        f"📊 *Daily Supply Chain Briefing — {date.today()}*\n\n"
        f"*Shipment Status:*\n{''.join(chr(10) + s for s in status_lines)}\n\n"
        f"*Alerts (24h):*\n{''.join(chr(10) + a for a in alert_lines)}\n\n"
        f"*Price Alerts:* {price_changes} supplier price spikes detected\n"
    )

    # Run AI risk analysis
    risk = analyze_supply_chain_risks(conn)
    if risk:
        briefing += (
            f"\n*AI Risk Assessment:* {risk.get('RISK_SUMMARY', 'N/A')}\n"
            f"*Top Issue:* {risk.get('CRITICAL_ISSUES', ['None'])[0] if isinstance(risk.get('CRITICAL_ISSUES'), list) else risk.get('CRITICAL_ISSUES', 'None')}\n"
        )

    req.post(SLACK_WEBHOOK, json={"text": briefing})

Step 7: Putting It All Together — Supply Chain Monitoring Agent

import schedule
import time
from datetime import date

# Configuration
TRACKED_SHIPMENTS = [
    {"carrier": "maersk", "tracking": "MAEU1234567"},
    {"carrier": "msc", "tracking": "MSCU7654321"},
    {"carrier": "ups", "tracking": "1Z999AA10123456784"},
]

SUPPLIER_CATALOGS = [
    {"name": "Shenzhen Electronics Co", "url": "https://supplier-a.com/catalog"},
    {"name": "Taiwan Semiconductor", "url": "https://supplier-b.com/products"},
    {"name": "Vietnam Packaging Ltd", "url": "https://supplier-c.com/pricing"},
]

PORT_MONITORS = [
    {"name": "Port of Los Angeles", "url": "https://www.portoflosangeles.org/"},
    {"name": "Port of Shanghai", "url": "https://www.portshanghai.com.cn/"},
    {"name": "Port of Rotterdam", "url": "https://www.portofrotterdam.com/"},
]


def run_shipment_monitoring():
    """Track all active shipments."""
    conn = init_supply_chain_db()

    for shipment in TRACKED_SHIPMENTS:
        result = scrape_shipment_status(shipment["carrier"], shipment["tracking"])
        if "error" not in result:
            store_shipment(conn, result)

    alerts = detect_shipment_delays(conn)
    for alert in alerts:
        send_supply_chain_alert(alert)
        conn.execute("""
            INSERT INTO supply_chain_alerts (alert_type, severity, category, description)
            VALUES (?, ?, ?, ?)
        """, (alert["type"], alert["severity"], alert["category"], alert["description"]))
        conn.commit()

    print(f"[{datetime.now()}] Shipment monitoring complete — "
          f"{len(TRACKED_SHIPMENTS)} shipments tracked")
    conn.close()


def run_supplier_monitoring():
    """Monitor supplier pricing and availability."""
    conn = init_supply_chain_db()

    for supplier in SUPPLIER_CATALOGS:
        prices = scrape_supplier_prices(supplier["url"], [])
        for price in prices:
            price["supplier_name"] = supplier["name"]
            store_supplier_price(conn, price)

    # Check for price spikes and inventory risks
    price_alerts = detect_price_spikes(conn)
    inventory_alerts = detect_inventory_risks(conn)

    for alert in price_alerts + inventory_alerts:
        send_supply_chain_alert(alert)
        conn.execute("""
            INSERT INTO supply_chain_alerts (alert_type, severity, category, description)
            VALUES (?, ?, ?, ?)
        """, (alert["type"], alert["severity"], alert["category"], alert["description"]))
        conn.commit()

    print(f"[{datetime.now()}] Supplier monitoring complete — "
          f"{len(SUPPLIER_CATALOGS)} suppliers checked")
    conn.close()


def run_port_monitoring():
    """Monitor port congestion levels."""
    conn = init_supply_chain_db()

    for port in PORT_MONITORS:
        data = scrape_port_congestion(port["url"])
        if data:
            data["port_name"] = port["name"]
            conn.execute("""
                INSERT INTO port_congestion (port_name, vessels_at_anchor,
                    average_wait_days, berth_utilization_pct, vessels_expected_7d,
                    congestion_level)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (data.get("port_name"), data.get("vessels_at_anchor", 0),
                  data.get("average_wait_days", 0), data.get("berth_utilization_pct", 0),
                  data.get("vessels_expected_7d", 0),
                  "high" if data.get("average_wait_days", 0) > 3 else "moderate"
                  if data.get("average_wait_days", 0) > 1 else "low"))
            conn.commit()

    alerts = detect_port_congestion_alerts(conn)
    for alert in alerts:
        send_supply_chain_alert(alert)

    print(f"[{datetime.now()}] Port monitoring complete — "
          f"{len(PORT_MONITORS)} ports checked")
    conn.close()


# Schedule monitoring
schedule.every(2).hours.do(run_shipment_monitoring)
schedule.every(6).hours.do(run_supplier_monitoring)
schedule.every(12).hours.do(run_port_monitoring)
schedule.every().day.at("07:00").do(
    lambda: generate_daily_briefing(init_supply_chain_db())
)

if __name__ == "__main__":
    print("🚢📦 Supply Chain Intelligence Agent — Starting...")
    run_shipment_monitoring()
    run_supplier_monitoring()
    run_port_monitoring()
    generate_daily_briefing(init_supply_chain_db())

    while True:
        schedule.run_pending()
        time.sleep(60)

Cost Comparison: Traditional vs. AI Agent Approach

Solution Monthly Cost Coverage Customization
FourKites $5,000–$20,000 Carrier tracking Limited
project44 $3,000–$15,000 Visibility platform Moderate
Resilinc $10,000–$50,000 Risk monitoring Moderate
Coupa Supply Chain $8,000–$30,000 Procurement + risk Enterprise
AI Agent + Mantis API $29–$299 All sources Fully custom
💡 Key advantage: Enterprise supply chain platforms require 6–12 month implementations and lock you into their data sources. An AI agent approach lets you monitor any carrier, any supplier, any port — and add new sources in minutes, not months. Plus, AI-powered analysis provides strategic insights that traditional dashboards can't.

Use Cases by Industry Segment

1. Manufacturers

Manufacturers managing hundreds of SKUs across dozens of suppliers use this pipeline to detect component shortages before they halt production. Monitor supplier lead times, track inbound shipments, and get early warning when a critical part goes out of stock — with AI-recommended alternative suppliers.

2. E-Commerce & D2C Brands

Online retailers that source products from overseas need visibility into container shipments, customs delays, and warehouse inventory levels. An AI agent monitors the entire journey from factory floor to fulfillment center and predicts when stock will run out based on current sales velocity.

3. Third-Party Logistics (3PLs)

3PLs managing shipments for multiple clients need multi-carrier visibility without buying enterprise subscriptions for each carrier. AI agents scrape tracking data from any carrier portal and provide a unified view across all shipments, with automated exception alerting.

4. Procurement Teams

Procurement professionals monitoring commodity prices, supplier availability, and contract compliance can automate the manual work of checking supplier catalogs and market prices. The AI analysis layer flags when it's time to renegotiate, switch suppliers, or build buffer stock.

Ethical Considerations & Best Practices

Deployment Options

Method Best For Cost
Cron job (VPS) Simple scheduled monitoring $5–$20/mo
AWS Lambda + EventBridge Serverless, auto-scaling $2–$30/mo
GitHub Actions Free tier for light monitoring Free–$10/mo
Docker + Kubernetes Enterprise multi-source monitoring $50+/mo

Start Building Your Supply Chain Intelligence Agent

The WebPerception API handles JavaScript-heavy carrier portals, AI-powered data extraction, and structured output — so you can focus on building the intelligence layer that keeps your supply chain running smoothly.

Get Your API Key →

What's Next

Once your supply chain intelligence agent is running, consider these enhancements:

Related Articles