Files

20 KiB

Architecture — willhaben-tracker

1. System Overview

A Docker Compose stack that monitors Austrian classifieds (willhaben.at) for price drops and new listings, then pushes real-time notifications via Telegram. Six services cooperate:

Service Image Role Port
db postgres:15-alpine Primary datastore; migrations on boot 55632 → 5432
rest postgrest/postgrest:v12.2.0 Auto-generated REST API over the PostgreSQL schema internal
kong kong:2.8.1 API gateway with JWT auth, CORS, request transform 55621 → 8000
studio supabase/studio Web admin dashboard for database management 55630 → 3000
meta supabase/postgres-meta:v0.84.2 Schema metadata service (feeds Studio) internal
worker custom (./worker/Dockerfile) Long-polling Telegram bot + scraping scheduler none

The worker is the only business-logic container. It connects directly to Postgres via asyncpg, bypassing PostgREST/Kong entirely. The Kong → PostgREST chain exists for the Studio dashboard and any future API clients.


2. Architecture Diagram

                 ┌──────────────┐
                 │  Telegram    │
                 │   Users      │
                 └──────┬───────┘
                        │ long-polling (HTTPS)
                        ▼
┌─────────────────────────────────────────────────────────────────┐
│                      Docker Network                             │
│                                                                 │
│  ┌──────────┐    asyncpg     ┌───────────┐                      │
│  │ worker   │◄──────────────►│   db      │                      │
│  │          │  scheduler +   │ postgres  │                      │
│  │ bot.py   │  notifier      │ 15        │                      │
│  │ main.py  │                └─────┬─────┘                      │
│  │ scraper.py                                   ┌───────┐      │
│  │                                              │ meta  │      │
│  └──────────┘                                  └───┬───┘      │
│                                        ┌───────────┤            │
│                                        ▼           ▼            │
│                                   ┌───────┐   ┌───────┐         │
│                                   │ rest  │   │ kong  │         │
│                                   │Postgre│◄─►│ 2.8   │         │
│                                   │ REST   │   │       │         │
│                                   └───────┘   └───┬───┘         │
│                                                   │              │
│                                          ┌────────▼────────┐    │
│                                          │     studio      │    │
│                                          │  (Supabase UI)   │    │
│                                          └─────────────────┘    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
                        │ httpx GET
                        ▼
              ┌──────────────────────┐
              │  willhaben.at        │
              │  /webapi/ad-search/… │
              └──────────────────────┘

Data flow (scrape → notify):

  1. Scheduler polls search_queries for due queries.
  2. For each query, calls willhaben API via scraper.py.
  3. Upserts ads into ads, detects new/price-drop events.
  4. Sends Telegram messages via notifier.py (photo or text).
  5. Logs to scrape_logs and notifications.

3. Database Schema

Core tables

-- Whitelisted users; source of truth for access control
users (
    id          uuid PRIMARY KEY,
    telegram_id bigint UNIQUE NOT NULL,
    username    text,
    first_name  text,
    is_admin    boolean DEFAULT false,
    is_active   boolean DEFAULT true,
    created_at  timestamptz DEFAULT now()
)

-- One saved search per user; drives scheduler intervals
search_queries (
    id               uuid PRIMARY KEY,
    user_id          uuid REFERENCES users(id) ON DELETE CASCADE,
    keyword          text NOT NULL,
    interval_minutes int DEFAULT 60,
    is_active        boolean DEFAULT true,
    last_scraped_at  timestamptz,
    created_at       timestamptz DEFAULT now()
)

-- De-duplicated ad snapshots from willhaben
ads (
    id              uuid PRIMARY KEY,
    wh_ad_id        text UNIQUE NOT NULL,     -- willhaben's internal ID
    raw_json        jsonb NOT NULL,           -- full API response payload
    title           text NOT NULL,
    price           numeric,                  -- parsed float; null if unknown
    location        text,
    url             text,
    published_at    timestamptz,
    first_seen_at   timestamptz DEFAULT now(),
    main_image_url  text                      -- migration 02
)

-- Junction: which query discovered which ad (enables per-user dedup)
query_ads (
    search_query_id uuid REFERENCES search_queries(id) ON DELETE CASCADE,
    ad_id           uuid REFERENCES ads(id) ON DELETE CASCADE,
    first_seen_at   timestamptz DEFAULT now(),
    is_notified     boolean DEFAULT false,
    PRIMARY KEY (search_query_id, ad_id)
)

-- Audit log of every Telegram notification sent
notifications (
    id         uuid PRIMARY KEY,
    user_id    uuid REFERENCES users(id) ON DELETE CASCADE,
    ad_id      uuid REFERENCES ads(id) ON DELETE SET NULL,
    message_id int,                        -- Telegram message ID for reply/edit
    sent_at    timestamptz DEFAULT now()
)

-- Per-run health log; used for debugging and monitoring
scrape_logs (
    id            uuid PRIMARY KEY,
    search_query_id uuid REFERENCES search_queries(id) ON DELETE CASCADE,
    status        text CHECK (status IN ('success', 'error', 'rate_limited')),
    ads_found     int DEFAULT 0,
    new_ads       int DEFAULT 0,
    error_message text,
    scraped_at    timestamptz DEFAULT now()
)

-- Price change history; migration 02
price_history (
    id        uuid PRIMARY KEY,
    ad_id     uuid REFERENCES ads(id) ON DELETE CASCADE,
    old_price numeric NOT NULL,
    new_price numeric NOT NULL,
    changed_at timestamptz DEFAULT now(),
    UNIQUE (ad_id, old_price, new_price)
)

Key indexes

Index Purpose
idx_search_queries_active_scraped Partial index on (is_active=true, last_scraped_at) — scheduler polling
idx_query_ads_notified Partial on (search_query_id, is_notified=false) — notifier lookups
idx_notifications_user_sent (user_id, sent_at DESC) — per-user notification history
idx_scrape_logs_query_at (search_query_id, scraped_at DESC) — latest runs per query
idx_price_history_ad_id Fast price timeline lookup per ad

4. Scheduler Loop

The scheduler runs as an infinite asyncio coroutine inside the worker container (main.py:22).

Cycle

while True:
    # 1. Poll DB for due queries
    rows = pool.fetch(
        "SELECT sq.id, sq.keyword, sq.interval_minutes, u.telegram_id"
        "FROM search_queries sq JOIN users u ON sq.user_id = u.id"
        "WHERE sq.is_active AND (last_scraped_at IS NULL"
        "  OR last_scraped_at < now() - interval_minutes)"
    )

    # 2. Process each query
    for row in rows:
        ads_raw, _ = await fetch_ads(keyword)       # call willhaben API
        for ad_data in ads_raw:
            fields = extract_ad_fields(ad_data)      # parse JSON → dict
            wh_ad_id = fields["wh_ad_id"]

            # 3. Global dedup on wh_ad_id (UNIQUE constraint)
            existing = pool.fetchrow("SELECT id FROM ads WHERE wh_ad_id = $1", wh_ad_id)

            if not existing:
                # New ad — INSERT, notify
                ...
            else:
                # Existing ad — check price drop
                old_price = existing["price"]
                new_price = fields["price"]
                if old_price and new_price and new_price < old_price:
                    UPDATE ads SET price = $1
                    INSERT INTO price_history ON CONFLICT DO NOTHING
                    await notify_price_drop(...)

            # 4. Query-level dedup via query_ads junction table
            existing_qa = pool.fetchrow(
                "SELECT 1 FROM query_ads WHERE search_query_id=$1 AND ad_id=$2", ...
            )
            if not existing_qa:
                INSERT INTO query_ads (...)
                await notify_new_ad(...)
                mark_notified(pool, query_id, ad_uuid)
                log_notification(pool, user_id, ad_uuid, msg_id)

        UPDATE search_queries SET last_scraped_at = now() WHERE id = $1
        INSERT INTO scrape_logs (status='success', ...)

    await asyncio.sleep(30)   # polling interval between cycles

Key behaviors

  • Interval-driven: Each query has its own interval_minutes. The scheduler selects queries whose last_scraped_at is stale relative to their interval.
  • Global ad dedup: The ads.wh_ad_id UNIQUE constraint ensures an ad is stored once regardless of how many queries match it.
  • Per-query notification dedup: Even though the ad row is global, the query_ads junction table tracks which query has already surfaced the ad to its owner. A new ad triggers a notification only when the (search_query_id, ad_id) pair is first inserted.
  • Price drop detection: On every scrape cycle, existing ads are re-checked. If new_price < old_price, the row updates and a price_history record is created (ON CONFLICT DO NOTHING prevents duplicate entries for the same price transition).
  • Error resilience: Per-query exceptions log to scrape_logs(status='error') and continue processing remaining queries. Top-level exceptions trigger a 30-second backoff before retrying the full cycle.

5. Notification Engine

Fan-Out Model

The scheduler scrapes each keyword exactly once per cycle, then fans out notifications to ALL active subscribers of that keyword. This eliminates duplicate API calls when multiple users track the same keyword.

Scheduler → scrape "iphone 15" once (30 ads)
                              ↓ detect events: 2 new, 1 price drop
               ┌──────────────┼──────────────┐
               ▼              ▼              ▼
          Subscriber A   Subscriber B   Subscriber C
          notify_new     notify_new     notify_new
          notify_drop    notify_drop    notify_drop

Delivery Mechanism

  • Each notification is a separate Telegram API call (photo with caption, or text-only fallback)
  • Subscribers are fetched in bulk per keyword: SELECT u.telegram_id FROM keyword_subscriptions ks JOIN users u ...
  • Fan-out uses asyncio.gather() for parallel delivery across subscribers
  • If subscriber count > 10, a 50ms sleep is inserted between each batch to respect Telegram's 30 msg/sec rate limit

Event Types

Function Trigger Header Photo?
notify_new_ad Ad not in ads table for this keyword cycle "🆕 New listing found!" Yes, if main_image_url exists
notify_price_drop Existing ad price decreased "⚠️ Price drop!" Yes, if main_image_url exists

Message Format (HTML)

<emoji> <event_type>!

<b>Title Here</b>

💰 1,234 €  📍 LOCATION, POSTCODE
Published: d.m.Y H:M | Modified: d.m.Y H:M
[View Ad →]   ← inline button

Initial Load Behavior

On the first scrape cycle after a keyword is created (keywords.initial_loaded = false), all ads are silently indexed into the database without sending any notifications. The flag is set to true after the first complete cycle. Subsequent cycles notify normally for new ads and price drops.

Delivery Guarantees & Error Handling

  • Best-effort: Failed sends to one subscriber don't block delivery to others
  • No retry queue: By design — keeps it simple for local deployment
  • Per-subscriber isolation: Each Telegram API call is wrapped in its own try/except
  • Audit logging: Every successful notification recorded in notifications(user_id, ad_id, message_id)

6. Telegram Bot

The bot uses python-telegram-bot v2x with long polling (app.updater.start_polling()). It shares the same asyncio event loop as the scheduler.

Command set

Command Audience Description
/start Anyone Auto-registers user in DB, shows help text
/add <keyword> Registered Creates a new search query; duplicate check via ILIKE
/list Registered Lists all queries with status, interval, last scraped time
/pause <uuid> Registered Sets is_active = false for the query (owner-only)
/resume <uuid> Registered Sets is_active = true
/delete <uuid> Registered Hard-deletes a query (CASCADE removes query_ads rows)
/stats Registered Aggregates: total queries, distinct ads tracked, notifications sent
/adduser <id> [admin] Admin only Inserts or activates a user; optional admin flag
/removeuser <id> Admin only Hard-deletes from users table (CASCADE)
/users Admin only Lists all registered users with role and status

Authentication model

  • Auto-registration: /start creates a users row if the telegram_id doesn't exist. The user starts as is_active=true, is_admin=false.
  • Whitelist check: Every command (except /start) calls _require_user(), which verifies the sender exists in DB and is active. Inactive users receive "Account deactivated."
  • Admin gate: Admin commands call _require_admin(), querying for is_admin=true AND is_active=true.

7. Scraper

API endpoint

GET https://www.willhaben.at/webapi/ad-search/search/atz/seo/kaufen-und-verkaufen/marktplatz
    ?keyword=<search>
    &rows=30
    &sort=1
Parameter Value Effect
keyword user-provided text Full-text search on willhaben
rows 30 Maximum results per page
sort 1 Sort by newest first

Only page 1 is fetched. The design trades completeness for speed and API politeness — the newest 30 ads are typically sufficient for early-bird notifications.

Headers

{
    "accept": "application/json",
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
    "x-wh-client": "api@willhaben.at;responsive_web;server;1.0.0;desktop",
}

Retry logic

Three attempts with exponential backoff: 2^1s, 2^2s. On third failure, the exception propagates to the scheduler's error handler.

Attribute parsing

The willhaben API returns ad attributes as a flat list of {name, values[]} objects. The parser extracts these into a dict keyed by attribute name:

def _parse_attributes(ad_dict) -> dict[str, str]:
    result = {}
    for attr in ad_dict["attributes"]["attribute"]:
        name = attr.get("name")
        values = attr.get("values", [])
        if name and values:
            result[name] = values[0]   # take first value only
    return result

Key attribute mappings:

  • HEADING → ad title (fallback to description)
  • PRICE/AMOUNT → numeric price (commas stripped, parsed as float)
  • LOCATION → display location string
  • SEO_URL → reconstructs full URL as /iad/{seo_url}
  • PUBLISHED_String / CHANGED_String → ISO 8601 timestamp

The main image is extracted from the first entry in advertImageList.advertImage.referenceImageUrl.


8. Key Decisions and Tradeoffs

Single worker container

Both the Telegram bot (long polling) and the scraping scheduler run as concurrent asyncio tasks within one Python process. This avoids inter-service communication complexity, reduces resource overhead (one container vs. two), and simplifies shared state — both coroutines use the same asyncpg.Pool. The tradeoff is that a crash in either component takes down the other; however, Docker's restart: unless-stopped policy recovers automatically.

Global ad dedup (ads.wh_ad_id UNIQUE)

An ad discovered by multiple users' queries shares a single row. This saves storage and ensures price history tracks the true market price of the listing, not per-user observations. The cost is that the query_ads junction table must be consulted to determine whether this particular user's query has already seen the ad — adding an extra DB lookup per ad per cycle.

DB-driven whitelist vs. env-var list

Users are stored in PostgreSQL rather than a static allowlist in environment variables or config files. Benefits:

  • Admin commands (/adduser, /removeuser) manage access without redeploying the container.
  • Per-user data (search queries, notification history) is naturally relational.
  • Role-based authorization (is_admin flag) is a simple column check rather than application logic branching on env var patterns.

The tradeoff is that an attacker with DB write access could grant themselves admin; however, the worker container only reads/writes through parameterized queries — there's no exposed SQL interface. The PostgREST/Kong layer handles external API auth separately.

Page-1-only scraping

Fetching only 30 ads per cycle (page 1, sorted newest) keeps API calls lightweight and reduces latency per scheduler tick. For a marketplace like willhaben where new listings appear continuously, the first page captures fresh inventory. Missing older pages means ads that were posted before the user's query was created won't be backfilled — but the /add command is designed for forward-looking monitoring, not historical discovery.

asyncpg direct connection (bypassing PostgREST)

The worker talks to Postgres directly rather than routing through Kong → PostgREST. This avoids HTTP overhead on the hot path (every ad per every query per cycle), gives full SQL flexibility (complex JOINs in the scheduler poll, conditional UPDATE/INSERT patterns), and eliminates JWT token management for internal traffic. PostgREST remains available for the Studio dashboard and any future REST clients.

Global keyword dedup (keywords + keyword_subscriptions)

Multiple users can subscribe to the same keyword without duplicating API calls or ad storage. One scrape feeds all subscribers via fan-out notification. The cost is that a subscriber cannot independently control their scrape interval — it's shared per keyword (set by the first user, adjustable only while no other users are subscribed). This trades per-user granularity for API efficiency and consistent price tracking across all observers of the same listing.