399 lines
20 KiB
Markdown
399 lines
20 KiB
Markdown
# Architecture — willhaben-tracker
|
|
|
|
## 1. System Overview
|
|
|
|
A Docker Compose stack that monitors Austrian classifieds (willhaben.at) for price drops and new listings, then pushes real-time notifications via Telegram. Six services cooperate:
|
|
|
|
| Service | Image | Role | Port |
|
|
|------------|--------------------------------------|-------------------------------------------------|-------------|
|
|
| `db` | `postgres:15-alpine` | Primary datastore; migrations on boot | 55632 → 5432|
|
|
| `rest` | `postgrest/postgrest:v12.2.0` | Auto-generated REST API over the PostgreSQL schema | internal |
|
|
| `kong` | `kong:2.8.1` | API gateway with JWT auth, CORS, request transform | 55621 → 8000|
|
|
| `studio` | `supabase/studio` | Web admin dashboard for database management | 55630 → 3000|
|
|
| `meta` | `supabase/postgres-meta:v0.84.2` | Schema metadata service (feeds Studio) | internal |
|
|
| `worker` | custom (`./worker/Dockerfile`) | Long-polling Telegram bot + scraping scheduler | none |
|
|
|
|
The **worker** is the only business-logic container. It connects directly to Postgres via `asyncpg`, bypassing PostgREST/Kong entirely. The Kong → PostgREST chain exists for the Studio dashboard and any future API clients.
|
|
|
|
---
|
|
|
|
## 2. Architecture Diagram
|
|
|
|
```
|
|
┌──────────────┐
|
|
│ Telegram │
|
|
│ Users │
|
|
└──────┬───────┘
|
|
│ long-polling (HTTPS)
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ Docker Network │
|
|
│ │
|
|
│ ┌──────────┐ asyncpg ┌───────────┐ │
|
|
│ │ worker │◄──────────────►│ db │ │
|
|
│ │ │ scheduler + │ postgres │ │
|
|
│ │ bot.py │ notifier │ 15 │ │
|
|
│ │ main.py │ └─────┬─────┘ │
|
|
│ │ scraper.py ┌───────┐ │
|
|
│ │ │ meta │ │
|
|
│ └──────────┘ └───┬───┘ │
|
|
│ ┌───────────┤ │
|
|
│ ▼ ▼ │
|
|
│ ┌───────┐ ┌───────┐ │
|
|
│ │ rest │ │ kong │ │
|
|
│ │Postgre│◄─►│ 2.8 │ │
|
|
│ │ REST │ │ │ │
|
|
│ └───────┘ └───┬───┘ │
|
|
│ │ │
|
|
│ ┌────────▼────────┐ │
|
|
│ │ studio │ │
|
|
│ │ (Supabase UI) │ │
|
|
│ └─────────────────┘ │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
│ httpx GET
|
|
▼
|
|
┌──────────────────────┐
|
|
│ willhaben.at │
|
|
│ /webapi/ad-search/… │
|
|
└──────────────────────┘
|
|
```
|
|
|
|
**Data flow (scrape → notify):**
|
|
1. Scheduler polls `search_queries` for due queries.
|
|
2. For each query, calls willhaben API via `scraper.py`.
|
|
3. Upserts ads into `ads`, detects new/price-drop events.
|
|
4. Sends Telegram messages via `notifier.py` (photo or text).
|
|
5. Logs to `scrape_logs` and `notifications`.
|
|
|
|
---
|
|
|
|
## 3. Database Schema
|
|
|
|
### Core tables
|
|
|
|
```sql
|
|
-- Whitelisted users; source of truth for access control
|
|
users (
|
|
id uuid PRIMARY KEY,
|
|
telegram_id bigint UNIQUE NOT NULL,
|
|
username text,
|
|
first_name text,
|
|
is_admin boolean DEFAULT false,
|
|
is_active boolean DEFAULT true,
|
|
created_at timestamptz DEFAULT now()
|
|
)
|
|
|
|
-- One saved search per user; drives scheduler intervals
|
|
search_queries (
|
|
id uuid PRIMARY KEY,
|
|
user_id uuid REFERENCES users(id) ON DELETE CASCADE,
|
|
keyword text NOT NULL,
|
|
interval_minutes int DEFAULT 60,
|
|
is_active boolean DEFAULT true,
|
|
last_scraped_at timestamptz,
|
|
created_at timestamptz DEFAULT now()
|
|
)
|
|
|
|
-- De-duplicated ad snapshots from willhaben
|
|
ads (
|
|
id uuid PRIMARY KEY,
|
|
wh_ad_id text UNIQUE NOT NULL, -- willhaben's internal ID
|
|
raw_json jsonb NOT NULL, -- full API response payload
|
|
title text NOT NULL,
|
|
price numeric, -- parsed float; null if unknown
|
|
location text,
|
|
url text,
|
|
published_at timestamptz,
|
|
first_seen_at timestamptz DEFAULT now(),
|
|
main_image_url text -- migration 02
|
|
)
|
|
|
|
-- Junction: which query discovered which ad (enables per-user dedup)
|
|
query_ads (
|
|
search_query_id uuid REFERENCES search_queries(id) ON DELETE CASCADE,
|
|
ad_id uuid REFERENCES ads(id) ON DELETE CASCADE,
|
|
first_seen_at timestamptz DEFAULT now(),
|
|
is_notified boolean DEFAULT false,
|
|
PRIMARY KEY (search_query_id, ad_id)
|
|
)
|
|
|
|
-- Audit log of every Telegram notification sent
|
|
notifications (
|
|
id uuid PRIMARY KEY,
|
|
user_id uuid REFERENCES users(id) ON DELETE CASCADE,
|
|
ad_id uuid REFERENCES ads(id) ON DELETE SET NULL,
|
|
message_id int, -- Telegram message ID for reply/edit
|
|
sent_at timestamptz DEFAULT now()
|
|
)
|
|
|
|
-- Per-run health log; used for debugging and monitoring
|
|
scrape_logs (
|
|
id uuid PRIMARY KEY,
|
|
search_query_id uuid REFERENCES search_queries(id) ON DELETE CASCADE,
|
|
status text CHECK (status IN ('success', 'error', 'rate_limited')),
|
|
ads_found int DEFAULT 0,
|
|
new_ads int DEFAULT 0,
|
|
error_message text,
|
|
scraped_at timestamptz DEFAULT now()
|
|
)
|
|
|
|
-- Price change history; migration 02
|
|
price_history (
|
|
id uuid PRIMARY KEY,
|
|
ad_id uuid REFERENCES ads(id) ON DELETE CASCADE,
|
|
old_price numeric NOT NULL,
|
|
new_price numeric NOT NULL,
|
|
changed_at timestamptz DEFAULT now(),
|
|
UNIQUE (ad_id, old_price, new_price)
|
|
)
|
|
```
|
|
|
|
### Key indexes
|
|
|
|
| Index | Purpose |
|
|
|-------|---------|
|
|
| `idx_search_queries_active_scraped` | Partial index on `(is_active=true, last_scraped_at)` — scheduler polling |
|
|
| `idx_query_ads_notified` | Partial on `(search_query_id, is_notified=false)` — notifier lookups |
|
|
| `idx_notifications_user_sent` | `(user_id, sent_at DESC)` — per-user notification history |
|
|
| `idx_scrape_logs_query_at` | `(search_query_id, scraped_at DESC)` — latest runs per query |
|
|
| `idx_price_history_ad_id` | Fast price timeline lookup per ad |
|
|
|
|
---
|
|
|
|
## 4. Scheduler Loop
|
|
|
|
The scheduler runs as an infinite `asyncio` coroutine inside the worker container (`main.py:22`).
|
|
|
|
### Cycle
|
|
|
|
```python
|
|
while True:
|
|
# 1. Poll DB for due queries
|
|
rows = pool.fetch(
|
|
"SELECT sq.id, sq.keyword, sq.interval_minutes, u.telegram_id"
|
|
"FROM search_queries sq JOIN users u ON sq.user_id = u.id"
|
|
"WHERE sq.is_active AND (last_scraped_at IS NULL"
|
|
" OR last_scraped_at < now() - interval_minutes)"
|
|
)
|
|
|
|
# 2. Process each query
|
|
for row in rows:
|
|
ads_raw, _ = await fetch_ads(keyword) # call willhaben API
|
|
for ad_data in ads_raw:
|
|
fields = extract_ad_fields(ad_data) # parse JSON → dict
|
|
wh_ad_id = fields["wh_ad_id"]
|
|
|
|
# 3. Global dedup on wh_ad_id (UNIQUE constraint)
|
|
existing = pool.fetchrow("SELECT id FROM ads WHERE wh_ad_id = $1", wh_ad_id)
|
|
|
|
if not existing:
|
|
# New ad — INSERT, notify
|
|
...
|
|
else:
|
|
# Existing ad — check price drop
|
|
old_price = existing["price"]
|
|
new_price = fields["price"]
|
|
if old_price and new_price and new_price < old_price:
|
|
UPDATE ads SET price = $1
|
|
INSERT INTO price_history ON CONFLICT DO NOTHING
|
|
await notify_price_drop(...)
|
|
|
|
# 4. Query-level dedup via query_ads junction table
|
|
existing_qa = pool.fetchrow(
|
|
"SELECT 1 FROM query_ads WHERE search_query_id=$1 AND ad_id=$2", ...
|
|
)
|
|
if not existing_qa:
|
|
INSERT INTO query_ads (...)
|
|
await notify_new_ad(...)
|
|
mark_notified(pool, query_id, ad_uuid)
|
|
log_notification(pool, user_id, ad_uuid, msg_id)
|
|
|
|
UPDATE search_queries SET last_scraped_at = now() WHERE id = $1
|
|
INSERT INTO scrape_logs (status='success', ...)
|
|
|
|
await asyncio.sleep(30) # polling interval between cycles
|
|
```
|
|
|
|
### Key behaviors
|
|
|
|
- **Interval-driven**: Each query has its own `interval_minutes`. The scheduler selects queries whose `last_scraped_at` is stale relative to their interval.
|
|
- **Global ad dedup**: The `ads.wh_ad_id UNIQUE` constraint ensures an ad is stored once regardless of how many queries match it.
|
|
- **Per-query notification dedup**: Even though the ad row is global, the `query_ads` junction table tracks which query has already surfaced the ad to its owner. A new ad triggers a notification only when the `(search_query_id, ad_id)` pair is first inserted.
|
|
- **Price drop detection**: On every scrape cycle, existing ads are re-checked. If `new_price < old_price`, the row updates and a `price_history` record is created (`ON CONFLICT DO NOTHING` prevents duplicate entries for the same price transition).
|
|
- **Error resilience**: Per-query exceptions log to `scrape_logs(status='error')` and continue processing remaining queries. Top-level exceptions trigger a 30-second backoff before retrying the full cycle.
|
|
|
|
---
|
|
|
|
## 5. Notification Engine
|
|
|
|
### Fan-Out Model
|
|
|
|
The scheduler scrapes each keyword exactly once per cycle, then fans out notifications to ALL active subscribers of that keyword. This eliminates duplicate API calls when multiple users track the same keyword.
|
|
|
|
```
|
|
Scheduler → scrape "iphone 15" once (30 ads)
|
|
↓ detect events: 2 new, 1 price drop
|
|
┌──────────────┼──────────────┐
|
|
▼ ▼ ▼
|
|
Subscriber A Subscriber B Subscriber C
|
|
notify_new notify_new notify_new
|
|
notify_drop notify_drop notify_drop
|
|
```
|
|
|
|
### Delivery Mechanism
|
|
|
|
- Each notification is a separate Telegram API call (photo with caption, or text-only fallback)
|
|
- Subscribers are fetched in bulk per keyword: `SELECT u.telegram_id FROM keyword_subscriptions ks JOIN users u ...`
|
|
- Fan-out uses `asyncio.gather()` for parallel delivery across subscribers
|
|
- If subscriber count > 10, a 50ms sleep is inserted between each batch to respect Telegram's 30 msg/sec rate limit
|
|
|
|
### Event Types
|
|
|
|
| Function | Trigger | Header | Photo? |
|
|
|----------|---------|--------|--------|
|
|
| `notify_new_ad` | Ad not in ads table for this keyword cycle | "🆕 New listing found!" | Yes, if `main_image_url` exists |
|
|
| `notify_price_drop` | Existing ad price decreased | "⚠️ Price drop!" | Yes, if `main_image_url` exists |
|
|
|
|
### Message Format (HTML)
|
|
|
|
```
|
|
<emoji> <event_type>!
|
|
|
|
<b>Title Here</b>
|
|
|
|
💰 1,234 € 📍 LOCATION, POSTCODE
|
|
Published: d.m.Y H:M | Modified: d.m.Y H:M
|
|
[View Ad →] ← inline button
|
|
```
|
|
|
|
### Initial Load Behavior
|
|
|
|
On the first scrape cycle after a keyword is created (`keywords.initial_loaded = false`), all ads are silently indexed into the database without sending any notifications. The flag is set to `true` after the first complete cycle. Subsequent cycles notify normally for new ads and price drops.
|
|
|
|
### Delivery Guarantees & Error Handling
|
|
|
|
- **Best-effort**: Failed sends to one subscriber don't block delivery to others
|
|
- **No retry queue**: By design — keeps it simple for local deployment
|
|
- **Per-subscriber isolation**: Each Telegram API call is wrapped in its own try/except
|
|
- **Audit logging**: Every successful notification recorded in `notifications(user_id, ad_id, message_id)`
|
|
|
|
---
|
|
|
|
## 6. Telegram Bot
|
|
|
|
The bot uses python-telegram-bot v2x with **long polling** (`app.updater.start_polling()`). It shares the same `asyncio` event loop as the scheduler.
|
|
|
|
### Command set
|
|
|
|
| Command | Audience | Description |
|
|
|---------|----------|-------------|
|
|
| `/start` | Anyone | Auto-registers user in DB, shows help text |
|
|
| `/add <keyword>` | Registered | Creates a new search query; duplicate check via `ILIKE` |
|
|
| `/list` | Registered | Lists all queries with status, interval, last scraped time |
|
|
| `/pause <uuid>` | Registered | Sets `is_active = false` for the query (owner-only) |
|
|
| `/resume <uuid>` | Registered | Sets `is_active = true` |
|
|
| `/delete <uuid>` | Registered | Hard-deletes a query (CASCADE removes query_ads rows) |
|
|
| `/stats` | Registered | Aggregates: total queries, distinct ads tracked, notifications sent |
|
|
| `/adduser <id> [admin]` | Admin only | Inserts or activates a user; optional admin flag |
|
|
| `/removeuser <id>` | Admin only | Hard-deletes from users table (CASCADE) |
|
|
| `/users` | Admin only | Lists all registered users with role and status |
|
|
|
|
### Authentication model
|
|
|
|
- **Auto-registration**: `/start` creates a `users` row if the `telegram_id` doesn't exist. The user starts as `is_active=true`, `is_admin=false`.
|
|
- **Whitelist check**: Every command (except `/start`) calls `_require_user()`, which verifies the sender exists in DB and is active. Inactive users receive "Account deactivated."
|
|
- **Admin gate**: Admin commands call `_require_admin()`, querying for `is_admin=true AND is_active=true`.
|
|
|
|
---
|
|
|
|
## 7. Scraper
|
|
|
|
### API endpoint
|
|
|
|
```
|
|
GET https://www.willhaben.at/webapi/ad-search/search/atz/seo/kaufen-und-verkaufen/marktplatz
|
|
?keyword=<search>
|
|
&rows=30
|
|
&sort=1
|
|
```
|
|
|
|
| Parameter | Value | Effect |
|
|
|-----------|-------|--------|
|
|
| `keyword` | user-provided text | Full-text search on willhaben |
|
|
| `rows` | `30` | Maximum results per page |
|
|
| `sort` | `1` | Sort by newest first |
|
|
|
|
Only **page 1** is fetched. The design trades completeness for speed and API politeness — the newest 30 ads are typically sufficient for early-bird notifications.
|
|
|
|
### Headers
|
|
|
|
```python
|
|
{
|
|
"accept": "application/json",
|
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
|
|
"x-wh-client": "api@willhaben.at;responsive_web;server;1.0.0;desktop",
|
|
}
|
|
```
|
|
|
|
### Retry logic
|
|
|
|
Three attempts with exponential backoff: `2^1s`, `2^2s`. On third failure, the exception propagates to the scheduler's error handler.
|
|
|
|
### Attribute parsing
|
|
|
|
The willhaben API returns ad attributes as a flat list of `{name, values[]}` objects. The parser extracts these into a dict keyed by attribute name:
|
|
|
|
```python
|
|
def _parse_attributes(ad_dict) -> dict[str, str]:
|
|
result = {}
|
|
for attr in ad_dict["attributes"]["attribute"]:
|
|
name = attr.get("name")
|
|
values = attr.get("values", [])
|
|
if name and values:
|
|
result[name] = values[0] # take first value only
|
|
return result
|
|
```
|
|
|
|
Key attribute mappings:
|
|
- `HEADING` → ad title (fallback to `description`)
|
|
- `PRICE/AMOUNT` → numeric price (commas stripped, parsed as float)
|
|
- `LOCATION` → display location string
|
|
- `SEO_URL` → reconstructs full URL as `/iad/{seo_url}`
|
|
- `PUBLISHED_String` / `CHANGED_String` → ISO 8601 timestamp
|
|
|
|
The main image is extracted from the first entry in `advertImageList.advertImage.referenceImageUrl`.
|
|
|
|
---
|
|
|
|
## 8. Key Decisions and Tradeoffs
|
|
|
|
### Single worker container
|
|
|
|
Both the Telegram bot (long polling) and the scraping scheduler run as concurrent `asyncio` tasks within one Python process. This avoids inter-service communication complexity, reduces resource overhead (one container vs. two), and simplifies shared state — both coroutines use the same `asyncpg.Pool`. The tradeoff is that a crash in either component takes down the other; however, Docker's `restart: unless-stopped` policy recovers automatically.
|
|
|
|
### Global ad dedup (`ads.wh_ad_id UNIQUE`)
|
|
|
|
An ad discovered by multiple users' queries shares a single row. This saves storage and ensures price history tracks the true market price of the listing, not per-user observations. The cost is that the `query_ads` junction table must be consulted to determine whether *this particular user's query* has already seen the ad — adding an extra DB lookup per ad per cycle.
|
|
|
|
### DB-driven whitelist vs. env-var list
|
|
|
|
Users are stored in PostgreSQL rather than a static allowlist in environment variables or config files. Benefits:
|
|
- **Admin commands** (`/adduser`, `/removeuser`) manage access without redeploying the container.
|
|
- **Per-user data** (search queries, notification history) is naturally relational.
|
|
- **Role-based authorization** (`is_admin` flag) is a simple column check rather than application logic branching on env var patterns.
|
|
|
|
The tradeoff is that an attacker with DB write access could grant themselves admin; however, the `worker` container only reads/writes through parameterized queries — there's no exposed SQL interface. The PostgREST/Kong layer handles external API auth separately.
|
|
|
|
### Page-1-only scraping
|
|
|
|
Fetching only 30 ads per cycle (page 1, sorted newest) keeps API calls lightweight and reduces latency per scheduler tick. For a marketplace like willhaben where new listings appear continuously, the first page captures fresh inventory. Missing older pages means ads that were posted before the user's query was created won't be backfilled — but the `/add` command is designed for forward-looking monitoring, not historical discovery.
|
|
|
|
### asyncpg direct connection (bypassing PostgREST)
|
|
|
|
The worker talks to Postgres directly rather than routing through Kong → PostgREST. This avoids HTTP overhead on the hot path (every ad per every query per cycle), gives full SQL flexibility (complex JOINs in the scheduler poll, conditional UPDATE/INSERT patterns), and eliminates JWT token management for internal traffic. PostgREST remains available for the Studio dashboard and any future REST clients.
|
|
|
|
### Global keyword dedup (`keywords` + `keyword_subscriptions`)
|
|
|
|
Multiple users can subscribe to the same keyword without duplicating API calls or ad storage. One scrape feeds all subscribers via fan-out notification. The cost is that a subscriber cannot independently control their scrape interval — it's shared per keyword (set by the first user, adjustable only while no other users are subscribed). This trades per-user granularity for API efficiency and consistent price tracking across all observers of the same listing.
|