diff --git a/worker/src/bot.py b/worker/src/bot.py index fe67e60..376aef1 100644 --- a/worker/src/bot.py +++ b/worker/src/bot.py @@ -51,7 +51,6 @@ async def _auto_register(update: Update) -> dict[str, Any] | None: telegram_id, ) if existing: - # Update name info in case it changed await pool.execute( "UPDATE users SET username = $1, first_name = $2 WHERE telegram_id = $3", username, @@ -79,8 +78,6 @@ def register_handlers(app: Application) -> None: app.add_handler(CommandHandler("start", start_handler)) app.add_handler(CommandHandler("add", add_handler)) app.add_handler(CommandHandler("list", list_handler)) - app.add_handler(CommandHandler("pause", pause_handler)) - app.add_handler(CommandHandler("resume", resume_handler)) app.add_handler(CommandHandler("delete", delete_handler)) app.add_handler(CommandHandler("stats", stats_handler)) app.add_handler(CommandHandler("adduser", adduser_handler)) @@ -99,11 +96,9 @@ async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N await update.message.reply_text( # type: ignore[union-attr] f"Hello {name}! I'll notify you about new willhaben listings.\n\n" "Available commands:\n" - "/add — Start tracking a keyword\n" - "/list — List your active searches\n" - "/pause — Pause a search\n" - "/resume — Resume a paused search\n" - "/delete — Delete a search\n" + "/add — Subscribe to a keyword (shared across users)\n" + "/list — List your subscriptions\n" + "/delete — Unsubscribe from a keyword\n" "/stats — View your tracking statistics", ) @@ -123,28 +118,48 @@ async def add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Non keyword = parts[1].strip().strip("'\"") pool = await get_pool() + existing = await pool.fetchrow( - "SELECT id FROM search_queries WHERE user_id = $1 AND keyword ILIKE $2", - row["id"], + "SELECT id, is_active FROM keywords WHERE LOWER(keyword) = LOWER($1)", keyword, ) - if existing: - await update.message.reply_text(f"Query already exists: {keyword}") # type: ignore[union-attr] - return - query_id = str(uuid.uuid4()) + if not existing: + kw_id = str(uuid.uuid4()) + await pool.execute( + "INSERT INTO keywords (id, keyword, interval_minutes, is_active) VALUES ($1, $2, 60, true)", + kw_id, + keyword, + ) + logger.info("Created new keyword '%s' (%s)", keyword, kw_id) + else: + kw_id = existing["id"] + await pool.execute( - "INSERT INTO search_queries (id, user_id, keyword) VALUES ($1, $2, $3)", - query_id, + "INSERT INTO keyword_subscriptions (keyword_id, user_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", + kw_id, row["id"], - keyword, ) - await update.message.reply_text( # type: ignore[union-attr] - f"Tracking \"{keyword}\"\n" - f"Query ID: {query_id}", + sub_count_row = await pool.fetchrow( + "SELECT COUNT(*) - 1 AS others FROM keyword_subscriptions WHERE keyword_id = $1", + kw_id, ) - logger.info("User %s added query '%s' (%s)", update.effective_user.id, keyword, query_id) + other_count = sub_count_row["others"] + + if other_count > 0: + await update.message.reply_text( # type: ignore[union-attr] + f"Tracking \"{keyword}\"\n" + f"Keyword ID: {kw_id}\n" + f"({other_count} other subscriber(s))", + ) + else: + await update.message.reply_text( # type: ignore[union-attr] + f"Tracking \"{keyword}\"\n" + f"Keyword ID: {kw_id}", + ) + + logger.info("User %s subscribed to keyword '%s' (%s)", update.effective_user.id, keyword, kw_id) # -- /list ---------------------------------------------------------------- @@ -155,98 +170,36 @@ async def list_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No return pool = await get_pool() - queries = await pool.fetch( - "SELECT id, keyword, interval_minutes, is_active, last_scraped_at " - "FROM search_queries WHERE user_id = $1 ORDER BY created_at DESC", + subscriptions = await pool.fetch( + "SELECT kw.id, kw.keyword, kw.is_active, kw.last_scraped_at, COUNT(ks2.user_id) AS subs " + "FROM keyword_subscriptions ks " + "JOIN keywords kw ON kw.id = ks.keyword_id " + "LEFT JOIN keyword_subscriptions ks2 ON ks2.keyword_id = ks.keyword_id " + "WHERE ks.user_id = $1 " + "GROUP BY kw.id " + "ORDER BY ks.created_at DESC", row["id"], ) - if not queries: - await update.message.reply_text("No active searches. Use /add to start.") # type: ignore[union-attr] + if not subscriptions: + await update.message.reply_text("No active subscriptions. Use /add to start.") # type: ignore[union-attr] return lines = [] - for i, q in enumerate(queries, 1): - status = "active" if q["is_active"] else "paused" - last = q["last_scraped_at"].strftime("%d.%m.%Y %H:%M") if q["last_scraped_at"] else "never" + for i, s in enumerate(subscriptions, 1): + status = "active" if s["is_active"] else "inactive" + last = s["last_scraped_at"].strftime("%d.%m.%Y %H:%M") if s["last_scraped_at"] else "never" + subs = f" | {s['subs']} subscriber(s)" if s["subs"] > 1 else "" lines.append( - f"{i}. \"{q['keyword']}\"\n" - f" ID: {q['id']}\n" - f" Interval: {q['interval_minutes']} min | Status: {status}\n" + f"{i}. \"{s['keyword']}\"\n" + f" ID: {s['id']}\n" + f" Status: {status}{subs}\n" f" Last scraped: {last}" ) await update.message.reply_text("\n\n".join(lines)) # type: ignore[union-attr] -# -- /pause --------------------------------------------------------------- - -async def pause_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update) - if not row: - return - - text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] - parts = text.split() - if len(parts) < 2: - await update.message.reply_text("Usage: /pause ") # type: ignore[union-attr] - return - - query_id = parts[1].strip() - try: - uuid.UUID(query_id) - except ValueError: - await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr] - return - - pool = await get_pool() - result = await pool.execute( - "UPDATE search_queries SET is_active = false WHERE id = $1 AND user_id = $2", - query_id, - row["id"], - ) - - if "1" not in result: - await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] - return - - await update.message.reply_text(f"Query paused: {query_id}") # type: ignore[union-attr] - - -# -- /resume -------------------------------------------------------------- - -async def resume_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update) - if not row: - return - - text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] - parts = text.split() - if len(parts) < 2: - await update.message.reply_text("Usage: /resume ") # type: ignore[union-attr] - return - - query_id = parts[1].strip() - try: - uuid.UUID(query_id) - except ValueError: - await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr] - return - - pool = await get_pool() - result = await pool.execute( - "UPDATE search_queries SET is_active = true WHERE id = $1 AND user_id = $2", - query_id, - row["id"], - ) - - if "1" not in result: - await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] - return - - await update.message.reply_text(f"Query resumed: {query_id}") # type: ignore[union-attr] - - # -- /delete -------------------------------------------------------------- async def delete_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: @@ -257,28 +210,49 @@ async def delete_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] parts = text.split() if len(parts) < 2: - await update.message.reply_text("Usage: /delete ") # type: ignore[union-attr] + await update.message.reply_text("Usage: /delete ") # type: ignore[union-attr] return - query_id = parts[1].strip() + keyword_id = parts[1].strip() try: - uuid.UUID(query_id) + uuid.UUID(keyword_id) except ValueError: - await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr] + await update.message.reply_text("Invalid keyword ID format.") # type: ignore[union-attr] return pool = await get_pool() + + kw_row = await pool.fetchrow( + "SELECT keyword FROM keywords WHERE id = $1", + keyword_id, + ) + if not kw_row: + await update.message.reply_text("Keyword not found.") # type: ignore[union-attr] + return + result = await pool.execute( - "DELETE FROM search_queries WHERE id = $1 AND user_id = $2", - query_id, + "DELETE FROM keyword_subscriptions WHERE keyword_id = $1 AND user_id = $2", + keyword_id, row["id"], ) - if "1" not in result: - await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] + if "0" in result: + await update.message.reply_text("Not subscribed to this keyword.") # type: ignore[union-attr] return - await update.message.reply_text(f"Query deleted: {query_id}") # type: ignore[union-attr] + remaining = await pool.fetchval( + "SELECT COUNT(*) FROM keyword_subscriptions WHERE keyword_id = $1", + keyword_id, + ) + + if remaining == 0: + await pool.execute( + "UPDATE keywords SET is_active = false WHERE id = $1", + keyword_id, + ) + + await update.message.reply_text(f"Unsubscribed from \"{kw_row['keyword']}\"") # type: ignore[union-attr] + logger.info("User %s unsubscribed from keyword '%s' (%s)", update.effective_user.id, kw_row["keyword"], keyword_id) # -- /stats --------------------------------------------------------------- @@ -290,16 +264,15 @@ async def stats_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N pool = await get_pool() - total_queries = await pool.fetchval( - "SELECT COUNT(*) FROM search_queries WHERE user_id = $1", + total_keywords = await pool.fetchval( + "SELECT COUNT(DISTINCT ks.keyword_id) " + "FROM keyword_subscriptions ks " + "WHERE ks.user_id = $1", row["id"], ) total_ads = await pool.fetchval( - "SELECT COUNT(DISTINCT qa.ad_id) " - "FROM query_ads qa JOIN search_queries sq ON qa.search_query_id = sq.id " - "WHERE sq.user_id = $1", - row["id"], + "SELECT COUNT(*) FROM ads" ) total_notifications = await pool.fetchval( @@ -309,7 +282,7 @@ async def stats_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N text = ( f"Your stats:\n\n" - f"Queries: {total_queries}\n" + f"Keywords subscribed: {total_keywords}\n" f"Ads tracked: {total_ads}\n" f"Notifications sent: {total_notifications}" ) diff --git a/worker/src/notifier.py b/worker/src/notifier.py index f3a4101..ee3340b 100644 --- a/worker/src/notifier.py +++ b/worker/src/notifier.py @@ -1,5 +1,6 @@ import logging from datetime import datetime +from typing import Any import asyncpg from telegram import InlineKeyboardButton, InlineKeyboardMarkup @@ -8,63 +9,139 @@ from telegram.ext import ExtBot logger = logging.getLogger(__name__) -async def notify_new_ad( - bot: ExtBot, - telegram_id: int, - ad: dict, -) -> None: +def _build_keyboard(ad: dict[str, Any]) -> InlineKeyboardMarkup | None: + keyboard: list[list[InlineKeyboardButton]] = [] + if ad.get("url"): + keyboard.append( + [InlineKeyboardButton("View Ad →", url=ad["url"])] + ) + return InlineKeyboardMarkup(keyboard) if keyboard else None + + +def _format_text(header: str, ad: dict[str, Any]) -> str: + title = ad.get("title", "Untitled") + price_str = f"{ad['price']:,.0f}" if ad.get("price") is not None else "N/A" + location_part = "" + if ad.get("location"): + if ad.get("postcode"): + location_part = f" 📍 {ad['location']}, {ad['postcode']}" + else: + location_part = f" 📍 {ad['location']}" + published_str = "" if ad.get("published_at"): - try: - dt = datetime.fromisoformat(ad["published_at"]) - published_str = f"Published: {dt:%d.%m.%Y %H:%M}" - except (ValueError, TypeError): - published_str = f"Published: {ad['published_at']}" + pub = ad["published_at"] + if isinstance(pub, datetime): + published_str = f"{pub.strftime('%d.%m.%Y %H:%M')}" + else: + try: + dt = datetime.fromisoformat(str(pub)) + published_str = f"{dt.strftime('%d.%m.%Y %H:%M')}" + except (ValueError, TypeError): + published_str = str(pub) - location_line = f" | 📍 {ad['location']}" if ad.get("location") else "" + modified_str = "" + if ad.get("modified_at"): + mod = ad["modified_at"] + if isinstance(mod, datetime): + modified_str = f"{mod.strftime('%d.%m.%Y %H:%M')}" + else: + try: + dt = datetime.fromisoformat(str(mod)) + modified_str = f"{dt.strftime('%d.%m.%Y %H:%M')}" + except (ValueError, TypeError): + modified_str = str(mod) - text = ( - f"🔍 New listing found!\n\n" - f"{ad.get('title', 'Untitled')}\n" - f"💰 {price_str} €{location_line}\n" - ) + text_lines = [ + header, + "", + f"{title}", + "", + f"💰 {price_str} €{location_part}", + ] if published_str: - text += f"{published_str}\n" + pub_line = f"Published: {published_str}" + if modified_str: + pub_line += f" | Modified: {modified_str}" + text_lines.append(pub_line) - keyboard = [] - if ad.get("url"): - keyboard.append( - [InlineKeyboardButton("View Ad", url=ad["url"])] - ) + return "\n".join(text_lines) - reply_markup: InlineKeyboardMarkup | None = ( - InlineKeyboardMarkup(keyboard) if keyboard else None - ) + +async def notify_new_ad( + bot: ExtBot, + telegram_id: int, + ad: dict[str, Any], +) -> int | None: + text = _format_text("🆕 New listing found!", ad) + reply_markup = _build_keyboard(ad) try: - message = await bot.send_message( - chat_id=telegram_id, - text=text, - reply_markup=reply_markup, - ) + if ad.get("main_image_url"): + message = await bot.send_photo( + chat_id=telegram_id, + photo=ad["main_image_url"], + caption=text, + parse_mode="HTML", + reply_markup=reply_markup, + ) + else: + message = await bot.send_message( + chat_id=telegram_id, + text=text, + parse_mode="HTML", + reply_markup=reply_markup, + ) + logger.info( "Sent notification to %s for ad %s (msg_id=%s)", telegram_id, ad.get("wh_ad_id"), message.message_id, ) + return message.message_id except Exception: logger.exception("Failed to send Telegram notification") + return None -async def mark_notified(pool: asyncpg.Pool, search_query_id: str, ad_id: str) -> None: - await pool.execute( - "UPDATE query_ads SET is_notified = true WHERE search_query_id = $1 AND ad_id = $2", - search_query_id, - ad_id, - ) +async def notify_price_drop( + bot: ExtBot, + telegram_id: int, + ad: dict[str, Any], +) -> int | None: + text = _format_text("⚠️ Price drop!", ad) + reply_markup = _build_keyboard(ad) + + try: + if ad.get("main_image_url"): + message = await bot.send_photo( + chat_id=telegram_id, + photo=ad["main_image_url"], + caption=text, + parse_mode="HTML", + reply_markup=reply_markup, + ) + else: + message = await bot.send_message( + chat_id=telegram_id, + text=text, + parse_mode="HTML", + reply_markup=reply_markup, + ) + + logger.info( + "Sent price drop to %s for ad %s (msg_id=%s)", + telegram_id, + ad.get("wh_ad_id"), + message.message_id, + ) + return message.message_id + except Exception: + logger.exception("Failed to send Telegram notification") + return None async def log_notification(