Files
willhaben-tracker/worker/src/bot.py
T

416 lines
14 KiB
Python

import logging
import uuid
import asyncpg
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import CommandHandler, ExtBot, ContextTypes
from db import get_pool
logger = logging.getLogger(__name__)
async def _require_user(update: Update) -> asyncpg.Row | None: # type: ignore[name-defined]
"""Look up user by telegram_id. Auto-register on first /start."""
telegram_id = update.effective_user.id
pool = await get_pool()
row = await pool.fetchrow(
"SELECT id, is_active FROM users WHERE telegram_id = $1",
telegram_id,
)
if not row or not row["is_active"]:
await update.message.reply_text("Access denied. This bot requires an invitation.") # type: ignore[union-attr]
return None
return row
async def _require_admin(update: Update) -> asyncpg.Row | None: # type: ignore[name-defined]
"""Require the sender to be a whitelisted admin."""
telegram_id = update.effective_user.id
pool = await get_pool()
row = await pool.fetchrow(
"SELECT id, is_admin FROM users WHERE telegram_id = $1 AND is_active = true AND is_admin = true",
telegram_id,
)
if not row:
await update.message.reply_text("Unauthorized — admin only.") # type: ignore[union-attr]
return None
return row
async def _auto_register(update: Update) -> asyncpg.Row | None: # type: ignore[name-defined]
"""Create user row on first contact if not present. Returns nothing if un-whitelisted."""
telegram_id = update.effective_user.id
username = update.effective_user.username or None
first_name = update.effective_user.first_name or None
pool = await get_pool()
existing = await pool.fetchrow(
"SELECT id, is_active FROM users WHERE telegram_id = $1",
telegram_id,
)
if existing:
# Update name info in case it changed
await pool.execute(
"UPDATE users SET username = $1, first_name = $2 WHERE telegram_id = $3",
username,
first_name,
telegram_id,
)
if not existing["is_active"]:
await update.message.reply_text("Account deactivated. Contact an admin.") # type: ignore[union-attr]
return None
return existing
user_uuid = str(uuid.uuid4())
await pool.execute(
"INSERT INTO users (id, telegram_id, username, first_name) VALUES ($1, $2, $3, $4)",
user_uuid,
telegram_id,
username,
first_name,
)
logger.info("Auto-registered user %s (%s)", telegram_id, first_name)
return asyncpg.Record(("id", user_uuid), ("is_active", True)) # type: ignore[call-arg]
def register_handlers(bot: ExtBot) -> None:
bot.add_handler(CommandHandler("start", start_handler))
bot.add_handler(CommandHandler("add", add_handler))
bot.add_handler(CommandHandler("list", list_handler))
bot.add_handler(CommandHandler("pause", pause_handler))
bot.add_handler(CommandHandler("resume", resume_handler))
bot.add_handler(CommandHandler("delete", delete_handler))
bot.add_handler(CommandHandler("stats", stats_handler))
bot.add_handler(CommandHandler("adduser", adduser_handler))
bot.add_handler(CommandHandler("removeuser", removeuser_handler))
bot.add_handler(CommandHandler("users", users_handler))
# -- /start ---------------------------------------------------------------
async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
row = await _auto_register(update)
if not row:
return
name = update.effective_user.first_name or f"user {update.effective_user.id}"
await update.message.reply_text( # type: ignore[union-attr]
f"Hello {name}! I'll notify you about new willhaben listings.\n\n"
"Available commands:\n"
"/add <keyword> — Start tracking a keyword\n"
"/list — List your active searches\n"
"/pause <query_id> — Pause a search\n"
"/resume <query_id> — Resume a paused search\n"
"/delete <query_id> — Delete a search\n"
"/stats — View your tracking statistics",
)
# -- /add -----------------------------------------------------------------
async def add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
row = await _require_user(update)
if not row:
return
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
parts = text.split(" ", 1)
if len(parts) < 2 or not parts[1].strip().strip("'\""):
await update.message.reply_text("Usage: /add <keyword>") # type: ignore[union-attr]
return
keyword = parts[1].strip().strip("'\"")
pool = await get_pool()
existing = await pool.fetchrow(
"SELECT id FROM search_queries WHERE user_id = $1 AND keyword ILIKE $2",
row["id"],
keyword,
)
if existing:
await update.message.reply_text(f"Query already exists: {keyword}") # type: ignore[union-attr]
return
query_id = str(uuid.uuid4())
await pool.execute(
"INSERT INTO search_queries (id, user_id, keyword) VALUES ($1, $2, $3)",
query_id,
row["id"],
keyword,
)
await update.message.reply_text( # type: ignore[union-attr]
f"Tracking \"{keyword}\"\n"
f"Query ID: {query_id}",
)
logger.info("User %s added query '%s' (%s)", update.effective_user.id, keyword, query_id)
# -- /list ----------------------------------------------------------------
async def list_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
row = await _require_user(update)
if not row:
return
pool = await get_pool()
queries = await pool.fetch(
"SELECT id, keyword, interval_minutes, is_active, last_scraped_at "
"FROM search_queries WHERE user_id = $1 ORDER BY created_at DESC",
row["id"],
)
if not queries:
await update.message.reply_text("No active searches. Use /add <keyword> to start.") # type: ignore[union-attr]
return
lines = []
for i, q in enumerate(queries, 1):
status = "active" if q["is_active"] else "paused"
last = q["last_scraped_at"].strftime("%d.%m.%Y %H:%M") if q["last_scraped_at"] else "never"
lines.append(
f"{i}. \"{q['keyword']}\"\n"
f" ID: {q['id']}\n"
f" Interval: {q['interval_minutes']} min | Status: {status}\n"
f" Last scraped: {last}"
)
await update.message.reply_text("\n\n".join(lines)) # type: ignore[union-attr]
# -- /pause ---------------------------------------------------------------
async def pause_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
row = await _require_user(update)
if not row:
return
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
parts = text.split()
if len(parts) < 2:
await update.message.reply_text("Usage: /pause <query_id>") # type: ignore[union-attr]
return
query_id = parts[1].strip()
try:
uuid.UUID(query_id)
except ValueError:
await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr]
return
pool = await get_pool()
result = await pool.execute(
"UPDATE search_queries SET is_active = false WHERE id = $1 AND user_id = $2",
query_id,
row["id"],
)
if "1" not in result:
await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr]
return
await update.message.reply_text(f"Query paused: {query_id}") # type: ignore[union-attr]
# -- /resume --------------------------------------------------------------
async def resume_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
row = await _require_user(update)
if not row:
return
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
parts = text.split()
if len(parts) < 2:
await update.message.reply_text("Usage: /resume <query_id>") # type: ignore[union-attr]
return
query_id = parts[1].strip()
try:
uuid.UUID(query_id)
except ValueError:
await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr]
return
pool = await get_pool()
result = await pool.execute(
"UPDATE search_queries SET is_active = true WHERE id = $1 AND user_id = $2",
query_id,
row["id"],
)
if "1" not in result:
await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr]
return
await update.message.reply_text(f"Query resumed: {query_id}") # type: ignore[union-attr]
# -- /delete --------------------------------------------------------------
async def delete_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
row = await _require_user(update)
if not row:
return
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
parts = text.split()
if len(parts) < 2:
await update.message.reply_text("Usage: /delete <query_id>") # type: ignore[union-attr]
return
query_id = parts[1].strip()
try:
uuid.UUID(query_id)
except ValueError:
await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr]
return
pool = await get_pool()
result = await pool.execute(
"DELETE FROM search_queries WHERE id = $1 AND user_id = $2",
query_id,
row["id"],
)
if "1" not in result:
await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr]
return
await update.message.reply_text(f"Query deleted: {query_id}") # type: ignore[union-attr]
# -- /stats ---------------------------------------------------------------
async def stats_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
row = await _require_user(update)
if not row:
return
pool = await get_pool()
total_queries = await pool.fetchval(
"SELECT COUNT(*) FROM search_queries WHERE user_id = $1",
row["id"],
)
total_ads = await pool.fetchval(
"SELECT COUNT(DISTINCT qa.ad_id) "
"FROM query_ads qa JOIN search_queries sq ON qa.search_query_id = sq.id "
"WHERE sq.user_id = $1",
row["id"],
)
total_notifications = await pool.fetchval(
"SELECT COUNT(*) FROM notifications WHERE user_id = $1",
row["id"],
)
text = (
f"Your stats:\n\n"
f"Queries: {total_queries}\n"
f"Ads tracked: {total_ads}\n"
f"Notifications sent: {total_notifications}"
)
await update.message.reply_text(text) # type: ignore[union-attr]
# -- /adduser (admin only) ------------------------------------------------
async def adduser_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
admin_row = await _require_admin(update)
if not admin_row:
return
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
parts = text.split()
if len(parts) < 2:
await update.message.reply_text("Usage: /adduser <telegram_id> [admin]") # type: ignore[union-attr]
return
try:
telegram_id = int(parts[1])
except ValueError:
await update.message.reply_text("Invalid Telegram ID (must be numeric).") # type: ignore[union-attr]
return
is_admin = parts[-1].lower() == "admin" if len(parts) >= 3 else False
pool = await get_pool()
user_uuid = str(uuid.uuid4())
await pool.execute(
"INSERT INTO users (id, telegram_id, is_admin) VALUES ($1, $2, $3) "
"ON CONFLICT (telegram_id) DO UPDATE SET is_admin = EXCLUDED.is_admin",
user_uuid,
telegram_id,
is_admin,
)
role = "admin" if is_admin else "user"
await update.message.reply_text( # type: ignore[union-attr]
f"Added Telegram ID {telegram_id} as {role}.",
)
logger.info("Admin %s added user %d (is_admin=%s)", update.effective_user.id, telegram_id, is_admin)
# -- /removeuser (admin only) ---------------------------------------------
async def removeuser_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
admin_row = await _require_admin(update)
if not admin_row:
return
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
parts = text.split()
if len(parts) < 2:
await update.message.reply_text("Usage: /removeuser <telegram_id>") # type: ignore[union-attr]
return
try:
telegram_id = int(parts[1])
except ValueError:
await update.message.reply_text("Invalid Telegram ID (must be numeric).") # type: ignore[union-attr]
return
pool = await get_pool()
result = await pool.execute(
"DELETE FROM users WHERE telegram_id = $1",
telegram_id,
)
if "0" in result:
await update.message.reply_text(f"No user found with Telegram ID {telegram_id}.") # type: ignore[union-attr]
else:
await update.message.reply_text( # type: ignore[union-attr]
f"Removed Telegram ID {telegram_id}.",
)
# -- /users (admin only) --------------------------------------------------
async def users_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
admin_row = await _require_admin(update)
if not admin_row:
return
pool = await get_pool()
users_list = await pool.fetch(
"SELECT telegram_id, username, first_name, is_admin, is_active, created_at "
"FROM users ORDER BY created_at DESC"
)
if not users_list:
await update.message.reply_text("No users registered.") # type: ignore[union-attr]
return
lines = []
for u in users_list:
role = "[admin]" if u["is_admin"] else ""
status = "active" if u["is_active"] else "inactive"
name = u["username"] or u["first_name"] or str(u["telegram_id"])
lines.append(f"{u['telegram_id']} | {name} | {status} {role}")
await update.message.reply_text( # type: ignore[union-attr]
"Registered users:\n\n" + "\n".join(lines),
)