20 KiB
Architecture — willhaben-tracker
1. System Overview
A Docker Compose stack that monitors Austrian classifieds (willhaben.at) for price drops and new listings, then pushes real-time notifications via Telegram. Six services cooperate:
| Service | Image | Role | Port |
|---|---|---|---|
db |
postgres:15-alpine |
Primary datastore; migrations on boot | 55632 → 5432 |
rest |
postgrest/postgrest:v12.2.0 |
Auto-generated REST API over the PostgreSQL schema | internal |
kong |
kong:2.8.1 |
API gateway with JWT auth, CORS, request transform | 55621 → 8000 |
studio |
supabase/studio |
Web admin dashboard for database management | 55630 → 3000 |
meta |
supabase/postgres-meta:v0.84.2 |
Schema metadata service (feeds Studio) | internal |
worker |
custom (./worker/Dockerfile) |
Long-polling Telegram bot + scraping scheduler | none |
The worker is the only business-logic container. It connects directly to Postgres via asyncpg, bypassing PostgREST/Kong entirely. The Kong → PostgREST chain exists for the Studio dashboard and any future API clients.
2. Architecture Diagram
┌──────────────┐
│ Telegram │
│ Users │
└──────┬───────┘
│ long-polling (HTTPS)
▼
┌─────────────────────────────────────────────────────────────────┐
│ Docker Network │
│ │
│ ┌──────────┐ asyncpg ┌───────────┐ │
│ │ worker │◄──────────────►│ db │ │
│ │ │ scheduler + │ postgres │ │
│ │ bot.py │ notifier │ 15 │ │
│ │ main.py │ └─────┬─────┘ │
│ │ scraper.py ┌───────┐ │
│ │ │ meta │ │
│ └──────────┘ └───┬───┘ │
│ ┌───────────┤ │
│ ▼ ▼ │
│ ┌───────┐ ┌───────┐ │
│ │ rest │ │ kong │ │
│ │Postgre│◄─►│ 2.8 │ │
│ │ REST │ │ │ │
│ └───────┘ └───┬───┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ studio │ │
│ │ (Supabase UI) │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
│ httpx GET
▼
┌──────────────────────┐
│ willhaben.at │
│ /webapi/ad-search/… │
└──────────────────────┘
Data flow (scrape → notify):
- Scheduler polls
search_queriesfor due queries. - For each query, calls willhaben API via
scraper.py. - Upserts ads into
ads, detects new/price-drop events. - Sends Telegram messages via
notifier.py(photo or text). - Logs to
scrape_logsandnotifications.
3. Database Schema
Core tables
-- Whitelisted users; source of truth for access control
users (
id uuid PRIMARY KEY,
telegram_id bigint UNIQUE NOT NULL,
username text,
first_name text,
is_admin boolean DEFAULT false,
is_active boolean DEFAULT true,
created_at timestamptz DEFAULT now()
)
-- One saved search per user; drives scheduler intervals
search_queries (
id uuid PRIMARY KEY,
user_id uuid REFERENCES users(id) ON DELETE CASCADE,
keyword text NOT NULL,
interval_minutes int DEFAULT 60,
is_active boolean DEFAULT true,
last_scraped_at timestamptz,
created_at timestamptz DEFAULT now()
)
-- De-duplicated ad snapshots from willhaben
ads (
id uuid PRIMARY KEY,
wh_ad_id text UNIQUE NOT NULL, -- willhaben's internal ID
raw_json jsonb NOT NULL, -- full API response payload
title text NOT NULL,
price numeric, -- parsed float; null if unknown
location text,
url text,
published_at timestamptz,
first_seen_at timestamptz DEFAULT now(),
main_image_url text -- migration 02
)
-- Junction: which query discovered which ad (enables per-user dedup)
query_ads (
search_query_id uuid REFERENCES search_queries(id) ON DELETE CASCADE,
ad_id uuid REFERENCES ads(id) ON DELETE CASCADE,
first_seen_at timestamptz DEFAULT now(),
is_notified boolean DEFAULT false,
PRIMARY KEY (search_query_id, ad_id)
)
-- Audit log of every Telegram notification sent
notifications (
id uuid PRIMARY KEY,
user_id uuid REFERENCES users(id) ON DELETE CASCADE,
ad_id uuid REFERENCES ads(id) ON DELETE SET NULL,
message_id int, -- Telegram message ID for reply/edit
sent_at timestamptz DEFAULT now()
)
-- Per-run health log; used for debugging and monitoring
scrape_logs (
id uuid PRIMARY KEY,
search_query_id uuid REFERENCES search_queries(id) ON DELETE CASCADE,
status text CHECK (status IN ('success', 'error', 'rate_limited')),
ads_found int DEFAULT 0,
new_ads int DEFAULT 0,
error_message text,
scraped_at timestamptz DEFAULT now()
)
-- Price change history; migration 02
price_history (
id uuid PRIMARY KEY,
ad_id uuid REFERENCES ads(id) ON DELETE CASCADE,
old_price numeric NOT NULL,
new_price numeric NOT NULL,
changed_at timestamptz DEFAULT now(),
UNIQUE (ad_id, old_price, new_price)
)
Key indexes
| Index | Purpose |
|---|---|
idx_search_queries_active_scraped |
Partial index on (is_active=true, last_scraped_at) — scheduler polling |
idx_query_ads_notified |
Partial on (search_query_id, is_notified=false) — notifier lookups |
idx_notifications_user_sent |
(user_id, sent_at DESC) — per-user notification history |
idx_scrape_logs_query_at |
(search_query_id, scraped_at DESC) — latest runs per query |
idx_price_history_ad_id |
Fast price timeline lookup per ad |
4. Scheduler Loop
The scheduler runs as an infinite asyncio coroutine inside the worker container (main.py:22).
Cycle
while True:
# 1. Poll DB for due queries
rows = pool.fetch(
"SELECT sq.id, sq.keyword, sq.interval_minutes, u.telegram_id"
"FROM search_queries sq JOIN users u ON sq.user_id = u.id"
"WHERE sq.is_active AND (last_scraped_at IS NULL"
" OR last_scraped_at < now() - interval_minutes)"
)
# 2. Process each query
for row in rows:
ads_raw, _ = await fetch_ads(keyword) # call willhaben API
for ad_data in ads_raw:
fields = extract_ad_fields(ad_data) # parse JSON → dict
wh_ad_id = fields["wh_ad_id"]
# 3. Global dedup on wh_ad_id (UNIQUE constraint)
existing = pool.fetchrow("SELECT id FROM ads WHERE wh_ad_id = $1", wh_ad_id)
if not existing:
# New ad — INSERT, notify
...
else:
# Existing ad — check price drop
old_price = existing["price"]
new_price = fields["price"]
if old_price and new_price and new_price < old_price:
UPDATE ads SET price = $1
INSERT INTO price_history ON CONFLICT DO NOTHING
await notify_price_drop(...)
# 4. Query-level dedup via query_ads junction table
existing_qa = pool.fetchrow(
"SELECT 1 FROM query_ads WHERE search_query_id=$1 AND ad_id=$2", ...
)
if not existing_qa:
INSERT INTO query_ads (...)
await notify_new_ad(...)
mark_notified(pool, query_id, ad_uuid)
log_notification(pool, user_id, ad_uuid, msg_id)
UPDATE search_queries SET last_scraped_at = now() WHERE id = $1
INSERT INTO scrape_logs (status='success', ...)
await asyncio.sleep(30) # polling interval between cycles
Key behaviors
- Interval-driven: Each query has its own
interval_minutes. The scheduler selects queries whoselast_scraped_atis stale relative to their interval. - Global ad dedup: The
ads.wh_ad_id UNIQUEconstraint ensures an ad is stored once regardless of how many queries match it. - Per-query notification dedup: Even though the ad row is global, the
query_adsjunction table tracks which query has already surfaced the ad to its owner. A new ad triggers a notification only when the(search_query_id, ad_id)pair is first inserted. - Price drop detection: On every scrape cycle, existing ads are re-checked. If
new_price < old_price, the row updates and aprice_historyrecord is created (ON CONFLICT DO NOTHINGprevents duplicate entries for the same price transition). - Error resilience: Per-query exceptions log to
scrape_logs(status='error')and continue processing remaining queries. Top-level exceptions trigger a 30-second backoff before retrying the full cycle.
5. Notification Engine
Fan-Out Model
The scheduler scrapes each keyword exactly once per cycle, then fans out notifications to ALL active subscribers of that keyword. This eliminates duplicate API calls when multiple users track the same keyword.
Scheduler → scrape "iphone 15" once (30 ads)
↓ detect events: 2 new, 1 price drop
┌──────────────┼──────────────┐
▼ ▼ ▼
Subscriber A Subscriber B Subscriber C
notify_new notify_new notify_new
notify_drop notify_drop notify_drop
Delivery Mechanism
- Each notification is a separate Telegram API call (photo with caption, or text-only fallback)
- Subscribers are fetched in bulk per keyword:
SELECT u.telegram_id FROM keyword_subscriptions ks JOIN users u ... - Fan-out uses
asyncio.gather()for parallel delivery across subscribers - If subscriber count > 10, a 50ms sleep is inserted between each batch to respect Telegram's 30 msg/sec rate limit
Event Types
| Function | Trigger | Header | Photo? |
|---|---|---|---|
notify_new_ad |
Ad not in ads table for this keyword cycle | "🆕 New listing found!" | Yes, if main_image_url exists |
notify_price_drop |
Existing ad price decreased | "⚠️ Price drop!" | Yes, if main_image_url exists |
Message Format (HTML)
<emoji> <event_type>!
<b>Title Here</b>
💰 1,234 € 📍 LOCATION, POSTCODE
Published: d.m.Y H:M | Modified: d.m.Y H:M
[View Ad →] ← inline button
Initial Load Behavior
On the first scrape cycle after a keyword is created (keywords.initial_loaded = false), all ads are silently indexed into the database without sending any notifications. The flag is set to true after the first complete cycle. Subsequent cycles notify normally for new ads and price drops.
Delivery Guarantees & Error Handling
- Best-effort: Failed sends to one subscriber don't block delivery to others
- No retry queue: By design — keeps it simple for local deployment
- Per-subscriber isolation: Each Telegram API call is wrapped in its own try/except
- Audit logging: Every successful notification recorded in
notifications(user_id, ad_id, message_id)
6. Telegram Bot
The bot uses python-telegram-bot v2x with long polling (app.updater.start_polling()). It shares the same asyncio event loop as the scheduler.
Command set
| Command | Audience | Description |
|---|---|---|
/start |
Anyone | Auto-registers user in DB, shows help text |
/add <keyword> |
Registered | Creates a new search query; duplicate check via ILIKE |
/list |
Registered | Lists all queries with status, interval, last scraped time |
/pause <uuid> |
Registered | Sets is_active = false for the query (owner-only) |
/resume <uuid> |
Registered | Sets is_active = true |
/delete <uuid> |
Registered | Hard-deletes a query (CASCADE removes query_ads rows) |
/stats |
Registered | Aggregates: total queries, distinct ads tracked, notifications sent |
/adduser <id> [admin] |
Admin only | Inserts or activates a user; optional admin flag |
/removeuser <id> |
Admin only | Hard-deletes from users table (CASCADE) |
/users |
Admin only | Lists all registered users with role and status |
Authentication model
- Auto-registration:
/startcreates ausersrow if thetelegram_iddoesn't exist. The user starts asis_active=true,is_admin=false. - Whitelist check: Every command (except
/start) calls_require_user(), which verifies the sender exists in DB and is active. Inactive users receive "Account deactivated." - Admin gate: Admin commands call
_require_admin(), querying foris_admin=true AND is_active=true.
7. Scraper
API endpoint
GET https://www.willhaben.at/webapi/ad-search/search/atz/seo/kaufen-und-verkaufen/marktplatz
?keyword=<search>
&rows=30
&sort=1
| Parameter | Value | Effect |
|---|---|---|
keyword |
user-provided text | Full-text search on willhaben |
rows |
30 |
Maximum results per page |
sort |
1 |
Sort by newest first |
Only page 1 is fetched. The design trades completeness for speed and API politeness — the newest 30 ads are typically sufficient for early-bird notifications.
Headers
{
"accept": "application/json",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"x-wh-client": "api@willhaben.at;responsive_web;server;1.0.0;desktop",
}
Retry logic
Three attempts with exponential backoff: 2^1s, 2^2s. On third failure, the exception propagates to the scheduler's error handler.
Attribute parsing
The willhaben API returns ad attributes as a flat list of {name, values[]} objects. The parser extracts these into a dict keyed by attribute name:
def _parse_attributes(ad_dict) -> dict[str, str]:
result = {}
for attr in ad_dict["attributes"]["attribute"]:
name = attr.get("name")
values = attr.get("values", [])
if name and values:
result[name] = values[0] # take first value only
return result
Key attribute mappings:
HEADING→ ad title (fallback todescription)PRICE/AMOUNT→ numeric price (commas stripped, parsed as float)LOCATION→ display location stringSEO_URL→ reconstructs full URL as/iad/{seo_url}PUBLISHED_String/CHANGED_String→ ISO 8601 timestamp
The main image is extracted from the first entry in advertImageList.advertImage.referenceImageUrl.
8. Key Decisions and Tradeoffs
Single worker container
Both the Telegram bot (long polling) and the scraping scheduler run as concurrent asyncio tasks within one Python process. This avoids inter-service communication complexity, reduces resource overhead (one container vs. two), and simplifies shared state — both coroutines use the same asyncpg.Pool. The tradeoff is that a crash in either component takes down the other; however, Docker's restart: unless-stopped policy recovers automatically.
Global ad dedup (ads.wh_ad_id UNIQUE)
An ad discovered by multiple users' queries shares a single row. This saves storage and ensures price history tracks the true market price of the listing, not per-user observations. The cost is that the query_ads junction table must be consulted to determine whether this particular user's query has already seen the ad — adding an extra DB lookup per ad per cycle.
DB-driven whitelist vs. env-var list
Users are stored in PostgreSQL rather than a static allowlist in environment variables or config files. Benefits:
- Admin commands (
/adduser,/removeuser) manage access without redeploying the container. - Per-user data (search queries, notification history) is naturally relational.
- Role-based authorization (
is_adminflag) is a simple column check rather than application logic branching on env var patterns.
The tradeoff is that an attacker with DB write access could grant themselves admin; however, the worker container only reads/writes through parameterized queries — there's no exposed SQL interface. The PostgREST/Kong layer handles external API auth separately.
Page-1-only scraping
Fetching only 30 ads per cycle (page 1, sorted newest) keeps API calls lightweight and reduces latency per scheduler tick. For a marketplace like willhaben where new listings appear continuously, the first page captures fresh inventory. Missing older pages means ads that were posted before the user's query was created won't be backfilled — but the /add command is designed for forward-looking monitoring, not historical discovery.
asyncpg direct connection (bypassing PostgREST)
The worker talks to Postgres directly rather than routing through Kong → PostgREST. This avoids HTTP overhead on the hot path (every ad per every query per cycle), gives full SQL flexibility (complex JOINs in the scheduler poll, conditional UPDATE/INSERT patterns), and eliminates JWT token management for internal traffic. PostgREST remains available for the Studio dashboard and any future REST clients.
Global keyword dedup (keywords + keyword_subscriptions)
Multiple users can subscribe to the same keyword without duplicating API calls or ad storage. One scrape feeds all subscribers via fan-out notification. The cost is that a subscriber cannot independently control their scrape interval — it's shared per keyword (set by the first user, adjustable only while no other users are subscribed). This trades per-user granularity for API efficiency and consistent price tracking across all observers of the same listing.