From fbbdc5e54b20bedb44c6875cd5902c12d25f0877 Mon Sep 17 00:00:00 2001 From: opencode Date: Tue, 16 Jun 2026 19:12:35 +0200 Subject: [PATCH] feat: python worker (bot, scraper, notifier, scheduler) --- worker/Dockerfile | 6 + worker/requirements.txt | 4 + worker/src/bot.py | 312 ++++++++++++++++++++++++++++++++++++++++ worker/src/db.py | 31 ++++ worker/src/main.py | 202 ++++++++++++++++++++++++++ worker/src/notifier.py | 78 ++++++++++ worker/src/scraper.py | 96 +++++++++++++ 7 files changed, 729 insertions(+) create mode 100644 worker/Dockerfile create mode 100644 worker/requirements.txt create mode 100644 worker/src/bot.py create mode 100644 worker/src/db.py create mode 100644 worker/src/main.py create mode 100644 worker/src/notifier.py create mode 100644 worker/src/scraper.py diff --git a/worker/Dockerfile b/worker/Dockerfile new file mode 100644 index 0000000..062da9e --- /dev/null +++ b/worker/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.12-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY src/ . +CMD ["python", "main.py"] diff --git a/worker/requirements.txt b/worker/requirements.txt new file mode 100644 index 0000000..3a74ff8 --- /dev/null +++ b/worker/requirements.txt @@ -0,0 +1,4 @@ +python-telegram-bot==21.4 +asyncpg==0.30.0 +httpx==0.27.2 +python-dotenv==1.0.1 diff --git a/worker/src/bot.py b/worker/src/bot.py new file mode 100644 index 0000000..60aab85 --- /dev/null +++ b/worker/src/bot.py @@ -0,0 +1,312 @@ +import os +import logging +import uuid + +import asyncpg +from telegram import Update +from telegram.ext import ContextTypes, CommandHandler, ExtBot + +from db import get_pool + +logger = logging.getLogger(__name__) + + +def _admin_ids() -> set[int]: + raw = os.getenv("ADMIN_TELEGRAM_IDS", "") + return {int(x.strip()) for x in raw.split(",") if x.strip()} + + +async def _require_user(update: Update, context: ContextTypes.DEFAULT_TYPE) -> asyncpg.Row | None: # type: ignore[name-defined] + telegram_id = update.effective_user.id + pool = await get_pool() + row = await pool.fetchrow( + "SELECT id, is_active FROM users WHERE telegram_id = $1", + telegram_id, + ) + if not row or not row["is_active"]: + await update.message.reply_text("Access denied. This bot requires an invitation.") # type: ignore[union-attr] + return None + return row + + +def register_handlers(bot: ExtBot) -> None: + bot.add_handler(CommandHandler("start", start_handler)) + bot.add_handler(CommandHandler("add", add_handler)) + bot.add_handler(CommandHandler("list", list_handler)) + bot.add_handler(CommandHandler("pause", pause_handler)) + bot.add_handler(CommandHandler("resume", resume_handler)) + bot.add_handler(CommandHandler("delete", delete_handler)) + bot.add_handler(CommandHandler("stats", stats_handler)) + bot.add_handler(CommandHandler("users", users_handler)) + + +# -- /start --------------------------------------------------------------- + + +async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + row = await _require_user(update, context) + if not row: + return + + user = update.effective_user + name = user.first_name or f"user {user.id}" + await update.message.reply_text( # type: ignore[union-attr] + f"Hello {name}! I'll notify you about new willhaben listings.\n\n" + "Available commands:\n" + "/add — Start tracking a keyword\n" + "/list — List your active searches\n" + "/pause — Pause a search\n" + "/resume — Resume a paused search\n" + "/delete — Delete a search\n" + "/stats — View your tracking statistics", + ) + + +# -- /add ----------------------------------------------------------------- + + +async def add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + row = await _require_user(update, context) + if not row: + return + + text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] + keyword = text.split(" ", 1) + if len(keyword) < 2 or not keyword[1].strip().strip("'\""): + await update.message.reply_text("Usage: /add ") # type: ignore[union-attr] + return + + keyword = keyword[1].strip().strip("'\"") + if not keyword: + await update.message.reply_text("Please provide a non-empty keyword.") # type: ignore[union-attr] + return + + pool = await get_pool() + existing = await pool.fetchrow( + "SELECT id FROM search_queries WHERE user_id = $1 AND keyword = $2", + row["id"], + keyword, + ) + if existing: + await update.message.reply_text(f"Query already exists: {keyword}") # type: ignore[union-attr] + return + + interval = int(os.getenv("DEFAULT_INTERVAL_MINUTES", "60")) + query_id = str(uuid.uuid4()) + await pool.execute( + "INSERT INTO search_queries (id, user_id, keyword, interval_minutes) VALUES ($1, $2, $3, $4)", + query_id, + row["id"], + keyword, + interval, + ) + + await update.message.reply_text( # type: ignore[union-attr] + f"✓ Tracking \"{keyword}\"\n" + f"Query ID: {query_id}\n" + f"Check interval: every {interval} minutes", + ) + logger.info("User %s added query '%s' (%s)", update.effective_user.id, keyword, query_id) + + +# -- /list ---------------------------------------------------------------- + + +async def list_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + row = await _require_user(update, context) + if not row: + return + + pool = await get_pool() + queries = await pool.fetch( + "SELECT id, keyword, interval_minutes, is_active, last_scraped_at " + "FROM search_queries WHERE user_id = $1 ORDER BY created_at DESC", + row["id"], + ) + + if not queries: + await update.message.reply_text("No active searches. Use /add to start.") # type: ignore[union-attr] + return + + lines = [] + for i, q in enumerate(queries, 1): + status = "active" if q["is_active"] else "paused" + last = q["last_scraped_at"].strftime("%d.%m.%Y %H:%M") if q["last_scraped_at"] else "never" + lines.append( + f"{i}. \"{q['keyword']}\"\n" + f" ID: {q['id']}\n" + f" Interval: {q['interval_minutes']} min | Status: {status}\n" + f" Last scraped: {last}" + ) + + await update.message.reply_text("\n\n".join(lines)) # type: ignore[union-attr] + + +# -- /pause --------------------------------------------------------------- + + +async def pause_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + row = await _require_user(update, context) + if not row: + return + + text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] + parts = text.split() + if len(parts) < 2: + await update.message.reply_text("Usage: /pause ") # type: ignore[union-attr] + return + + query_id = parts[1].strip() + try: + uuid.UUID(query_id) + except ValueError: + await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr] + return + + pool = await get_pool() + result = await pool.execute( + "UPDATE search_queries SET is_active = false WHERE id = $1 AND user_id = $2", + query_id, + row["id"], + ) + + if "1" not in result: + await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] + return + + await update.message.reply_text(f"✓ Query paused: {query_id}") # type: ignore[union-attr] + logger.info("User %s paused query %s", update.effective_user.id, query_id) + + +# -- /resume -------------------------------------------------------------- + + +async def resume_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + row = await _require_user(update, context) + if not row: + return + + text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] + parts = text.split() + if len(parts) < 2: + await update.message.reply_text("Usage: /resume ") # type: ignore[union-attr] + return + + query_id = parts[1].strip() + try: + uuid.UUID(query_id) + except ValueError: + await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr] + return + + pool = await get_pool() + result = await pool.execute( + "UPDATE search_queries SET is_active = true WHERE id = $1 AND user_id = $2", + query_id, + row["id"], + ) + + if "1" not in result: + await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] + return + + await update.message.reply_text(f"✓ Query resumed: {query_id}") # type: ignore[union-attr] + logger.info("User %s resumed query %s", update.effective_user.id, query_id) + + +# -- /delete -------------------------------------------------------------- + + +async def delete_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + row = await _require_user(update, context) + if not row: + return + + text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] + parts = text.split() + if len(parts) < 2: + await update.message.reply_text("Usage: /delete ") # type: ignore[union-attr] + return + + query_id = parts[1].strip() + try: + uuid.UUID(query_id) + except ValueError: + await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr] + return + + pool = await get_pool() + result = await pool.execute( + "DELETE FROM search_queries WHERE id = $1 AND user_id = $2", + query_id, + row["id"], + ) + + if "1" not in result: + await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] + return + + await update.message.reply_text(f"✓ Query deleted: {query_id}") # type: ignore[union-attr] + logger.info("User %s deleted query %s", update.effective_user.id, query_id) + + +# -- /stats --------------------------------------------------------------- + + +async def stats_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + row = await _require_user(update, context) + if not row: + return + + pool = await get_pool() + + total_queries = await pool.fetchval( + "SELECT COUNT(*) FROM search_queries WHERE user_id = $1", + row["id"], + ) + + total_ads = await pool.fetchval( + "SELECT COUNT(DISTINCT qa.ad_id) " + "FROM query_ads qa JOIN search_queries sq ON qa.search_query_id = sq.id " + "WHERE sq.user_id = $1", + row["id"], + ) + + total_notifications = await pool.fetchval( + "SELECT COUNT(*) FROM notifications WHERE user_id = $1", + row["id"], + ) + + text = ( + f"📊 Your stats:\n\n" + f"Queries: {total_queries}\n" + f"Ads tracked: {total_ads}\n" + f"Notifications sent: {total_notifications}" + ) + await update.message.reply_text(text) # type: ignore[union-attr] + + +# -- /users (admin only) -------------------------------------------------- + + +async def users_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if update.effective_user.id not in _admin_ids(): + await update.message.reply_text("Unauthorized") # type: ignore[union-attr] + return + + pool = await get_pool() + users = await pool.fetch( + "SELECT telegram_id, username, first_name, is_active, created_at FROM users ORDER BY created_at DESC" + ) + + if not users: + await update.message.reply_text("No users registered.") # type: ignore[union-attr] + return + + lines = [] + for u in users: + status = "active" if u["is_active"] else "inactive" + name = u["username"] or u["first_name"] or "unknown" + lines.append(f"• {u['telegram_id']} | @{name} | {status}") + + await update.message.reply_text("👥 Registered users:\n\n" + "\n".join(lines)) # type: ignore[union-attr] diff --git a/worker/src/db.py b/worker/src/db.py new file mode 100644 index 0000000..b30af52 --- /dev/null +++ b/worker/src/db.py @@ -0,0 +1,31 @@ +import os +import logging +import asyncpg + +logger = logging.getLogger(__name__) + +_pool: asyncpg.Pool | None = None + + +async def get_pool() -> asyncpg.Pool: + global _pool + if _pool is None: + _pool = await asyncpg.create_pool( + host=os.getenv("POSTGRES_HOST", "db"), + port=int(os.getenv("POSTGRES_PORT", "5432")), + user=os.getenv("POSTGRES_USER", "postgres"), + password=os.getenv("POSTGRES_PASSWORD"), + database=os.getenv("POSTGRES_DB", "postgres"), + min_size=2, + max_size=10, + ) + logger.info("Database pool initialized") + return _pool + + +async def close_pool() -> None: + global _pool + if _pool is not None: + await _pool.close() + logger.info("Database pool closed") + _pool = None diff --git a/worker/src/main.py b/worker/src/main.py new file mode 100644 index 0000000..f5a6c51 --- /dev/null +++ b/worker/src/main.py @@ -0,0 +1,202 @@ +import asyncio +import json +import logging +import os +import signal +import sys +from contextlib import suppress + +from dotenv import load_dotenv +from telegram import Update +from telegram.ext import Application, ExtBot + +from db import close_pool, get_pool +from scraper import extract_ad_fields, fetch_ads +from notifier import log_notification, mark_notified, notify_new_ad + +logger = logging.getLogger(__name__) + +load_dotenv() + + +async def scheduler_task(pool: object, bot: ExtBot) -> None: + while True: + try: + rows = await pool.fetch( + """ + SELECT sq.id, sq.keyword, sq.interval_minutes, u.telegram_id + FROM search_queries sq + JOIN users u ON sq.user_id = u.id + WHERE sq.is_active = true + AND (sq.last_scraped_at IS NULL OR + sq.last_scraped_at < now() - (sq.interval_minutes || ' minutes')::interval) + """ + ) + + for row in rows: + query_id = str(row["id"]) + keyword = row["keyword"] + telegram_id = row["telegram_id"] + + logger.info("Scraping keyword '%s' for query %s", keyword, query_id) + + try: + ads_raw, total_hits = await fetch_ads(keyword) + new_count = 0 + + for ad_data in ads_raw: + fields = extract_ad_fields(ad_data) + wh_ad_id = fields["wh_ad_id"] + + existing = await pool.fetchrow( + "SELECT id FROM ads WHERE wh_ad_id = $1", + wh_ad_id, + ) + + if not existing: + ad_row = await pool.fetchrow( + """ + INSERT INTO ads (wh_ad_id, raw_json, title, price, location, url, published_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id + """, + wh_ad_id, + json.dumps(ad_data), + fields["title"], + fields["price"], + fields["location"], + fields["url"], + fields.get("published_at"), + ) + ad_uuid = str(ad_row["id"]) + else: + ad_uuid = str(existing["id"]) + + existing_qa = await pool.fetchrow( + "SELECT 1 FROM query_ads WHERE search_query_id = $1 AND ad_id = $2", + query_id, + ad_uuid, + ) + + if not existing_qa: + await pool.execute( + "INSERT INTO query_ads (search_query_id, ad_id) VALUES ($1, $2)", + query_id, + ad_uuid, + ) + + user_row = await pool.fetchrow( + "SELECT id FROM users WHERE telegram_id = $1", + telegram_id, + ) + user_id = str(user_row["id"]) if user_row else None + + notify_fields = {**fields, "keyword": keyword} + await notify_new_ad(bot, telegram_id, notify_fields) + + if user_id: + await mark_notified(pool, query_id, ad_uuid) + try: + msg_id = 0 + await log_notification(pool, user_id, ad_uuid, msg_id) + except Exception: + logger.exception("Failed to log notification") + + new_count += 1 + logger.info( + "New ad %s found for query %s (keyword=%s)", + wh_ad_id, + query_id, + keyword, + ) + + await pool.execute( + "UPDATE search_queries SET last_scraped_at = now() WHERE id = $1", + query_id, + ) + + await pool.execute( + """ + INSERT INTO scrape_logs (search_query_id, status, ads_found, new_ads) + VALUES ($1, 'success', $2, $3) + """, + query_id, + len(ads_raw), + new_count, + ) + + except Exception: + logger.exception("Error scraping keyword '%s' (query %s)", keyword, query_id) + await pool.execute( + """ + INSERT INTO scrape_logs (search_query_id, status, error_message) + VALUES ($1, 'error', $2) + """, + query_id, + str(sys.exc_info()[1]), + ) + + await asyncio.sleep(5) + + except Exception: + logger.exception("Scheduler iteration error") + + await asyncio.sleep(30) + + +async def main() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + if not os.getenv("TELEGRAM_BOT_TOKEN"): + logger.error("TELEGRAM_BOT_TOKEN is required") + sys.exit(1) + + pool = await get_pool() + + app = Application.builder().token(os.getenv("TELEGRAM_BOT_TOKEN")).build() + + from bot import register_handlers # noqa: E402 + + register_handlers(app.bot) + + scheduler = asyncio.ensure_future(scheduler_task(pool, app.bot)) + + loop = asyncio.get_running_loop() + stop = loop.create_future() + + def _signal_handler() -> None: + if not stop.done(): + stop.set_result(True) + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, _signal_handler) + + try: + await app.initialize() + await app.start() + logger.info("Bot started with long polling") + + poll_task = asyncio.ensure_future(app.updater.start_polling()) # type: ignore[attr-defined] + + await stop + logger.info("Shutting down...") + + finally: + scheduler.cancel() + with suppress(asyncio.CancelledError): + await scheduler + + poll_task.cancel() + with suppress(asyncio.CancelledError): + await poll_task + + await app.shutdown() + await close_pool() + logger.info("Shutdown complete") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/worker/src/notifier.py b/worker/src/notifier.py new file mode 100644 index 0000000..f3a4101 --- /dev/null +++ b/worker/src/notifier.py @@ -0,0 +1,78 @@ +import logging +from datetime import datetime + +import asyncpg +from telegram import InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import ExtBot + +logger = logging.getLogger(__name__) + + +async def notify_new_ad( + bot: ExtBot, + telegram_id: int, + ad: dict, +) -> None: + price_str = f"{ad['price']:,.0f}" if ad.get("price") is not None else "N/A" + + published_str = "" + if ad.get("published_at"): + try: + dt = datetime.fromisoformat(ad["published_at"]) + published_str = f"Published: {dt:%d.%m.%Y %H:%M}" + except (ValueError, TypeError): + published_str = f"Published: {ad['published_at']}" + + location_line = f" | 📍 {ad['location']}" if ad.get("location") else "" + + text = ( + f"🔍 New listing found!\n\n" + f"{ad.get('title', 'Untitled')}\n" + f"💰 {price_str} €{location_line}\n" + ) + if published_str: + text += f"{published_str}\n" + + keyboard = [] + if ad.get("url"): + keyboard.append( + [InlineKeyboardButton("View Ad", url=ad["url"])] + ) + + reply_markup: InlineKeyboardMarkup | None = ( + InlineKeyboardMarkup(keyboard) if keyboard else None + ) + + try: + message = await bot.send_message( + chat_id=telegram_id, + text=text, + reply_markup=reply_markup, + ) + logger.info( + "Sent notification to %s for ad %s (msg_id=%s)", + telegram_id, + ad.get("wh_ad_id"), + message.message_id, + ) + except Exception: + logger.exception("Failed to send Telegram notification") + + +async def mark_notified(pool: asyncpg.Pool, search_query_id: str, ad_id: str) -> None: + await pool.execute( + "UPDATE query_ads SET is_notified = true WHERE search_query_id = $1 AND ad_id = $2", + search_query_id, + ad_id, + ) + + +async def log_notification( + pool: asyncpg.Pool, user_id: str, ad_id: str, message_id: int +) -> None: + await pool.execute( + "INSERT INTO notifications (user_id, ad_id, message_id) VALUES ($1, $2, $3)", + user_id, + ad_id, + message_id, + ) diff --git a/worker/src/scraper.py b/worker/src/scraper.py new file mode 100644 index 0000000..5b274ef --- /dev/null +++ b/worker/src/scraper.py @@ -0,0 +1,96 @@ +import asyncio +import logging +from datetime import datetime, timezone +from typing import Any +from urllib.parse import quote_plus + +import httpx + +logger = logging.getLogger(__name__) + +_API_URL = ( + "https://www.willhaben.at/webapi/ad-search/search/atz/seo/" + "kaufen-und-verkaufen/marktplatz" +) + +_HEADERS = { + "accept": "application/json", + "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", + "x-wh-client": "api@willhaben.at;responsive_web;server;1.0.0;desktop", +} + + +async def fetch_ads(keyword: str) -> tuple[list[dict[str, Any]], int]: + params = { + "keyword": keyword, + "rows": 30, + "sort": 1, + } + + async with httpx.AsyncClient(timeout=30.0) as client: + for attempt in range(1, 4): + try: + resp = await client.get(_API_URL, headers=_HEADERS, params=params) + resp.raise_for_status() + data = resp.json() + break + except Exception as exc: + logger.warning("fetch_ads attempt %d failed: %s", attempt, exc) + if attempt < 3: + await asyncio.sleep(2 ** attempt) + continue + raise + + ads_raw = data.get("advertSummaryList", {}).get("advertSummary", []) + total_hits = int(data.get("rowsFound", 0)) + return ads_raw, total_hits + + +def _parse_attributes(ad_dict: dict[str, Any]) -> dict[str, str]: + attr_list = ad_dict.get("attributes", {}).get("attribute", []) + result: dict[str, str] = {} + for attr in attr_list: + name = attr.get("name") + values = attr.get("values", []) + if name and values: + result[name] = values[0] + return result + + +def extract_ad_fields(ad_dict: dict[str, Any]) -> dict[str, Any]: + attrs = _parse_attributes(ad_dict) + + title = attrs.get("HEADING") or ad_dict.get("description", "") + + # Price from "PRICE/AMOUNT" attribute (API format) + price_raw = attrs.get("PRICE/AMOUNT") + price: float | None = None + if price_raw is not None: + try: + price = float(str(price_raw).replace(",", "")) + except (ValueError, TypeError): + pass + + location = attrs.get("LOCATION") + + seo_url = attrs.get("SEO_URL") + url = f"https://www.willhaben.at/iad/{seo_url}" if seo_url else None + + # Published time from CHANGED_String or PUBLISHED_String (ISO 8601) + published_raw = attrs.get("PUBLISHED_String") or attrs.get("CHANGED_String") + published_at: str | None = None + if published_raw: + try: + dt = datetime.fromisoformat(published_raw.replace("Z", "+00:00")) + published_at = dt.isoformat() + except (ValueError, TypeError): + pass + + return { + "wh_ad_id": str(ad_dict.get("id", "")), + "title": title or "", + "price": price, + "location": location, + "url": url, + "published_at": published_at, + }