diff --git a/supabase/migrations/01-init.sql b/supabase/migrations/01-init.sql index 54dd98e..fe7484d 100644 --- a/supabase/migrations/01-init.sql +++ b/supabase/migrations/01-init.sql @@ -10,6 +10,7 @@ CREATE TABLE IF NOT EXISTS users ( telegram_id bigint UNIQUE NOT NULL, username text, first_name text, + is_admin boolean NOT NULL DEFAULT false, is_active boolean NOT NULL DEFAULT true, created_at timestamptz NOT NULL DEFAULT now() ); diff --git a/supabase/migrations/post-boot.sql b/supabase/migrations/post-boot.sql index b5cec10..eda3d0f 100644 --- a/supabase/migrations/post-boot.sql +++ b/supabase/migrations/post-boot.sql @@ -1,8 +1,12 @@ -- ============================================================ -- post-boot — runs after all migrations have been applied. -- Grants INSERT/UPDATE to authenticator on user-facing tables. --- (Run after 01-init.sql so tables exist.) +-- Seeds initial admin user (telegram_id 298181113). -- ============================================================ GRANT INSERT, UPDATE ON search_queries TO authenticator; GRANT INSERT, UPDATE ON notifications TO authenticator; + +INSERT INTO users (telegram_id, username, first_name, is_admin, is_active) +VALUES (298181113, NULL, 'Admin', true, true) +ON CONFLICT (telegram_id) DO NOTHING; diff --git a/worker/src/bot.py b/worker/src/bot.py index 60aab85..15b3838 100644 --- a/worker/src/bot.py +++ b/worker/src/bot.py @@ -1,22 +1,17 @@ -import os import logging import uuid import asyncpg -from telegram import Update -from telegram.ext import ContextTypes, CommandHandler, ExtBot +from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update +from telegram.ext import CommandHandler, ExtBot, ContextTypes from db import get_pool logger = logging.getLogger(__name__) -def _admin_ids() -> set[int]: - raw = os.getenv("ADMIN_TELEGRAM_IDS", "") - return {int(x.strip()) for x in raw.split(",") if x.strip()} - - -async def _require_user(update: Update, context: ContextTypes.DEFAULT_TYPE) -> asyncpg.Row | None: # type: ignore[name-defined] +async def _require_user(update: Update) -> asyncpg.Row | None: # type: ignore[name-defined] + """Look up user by telegram_id. Auto-register on first /start.""" telegram_id = update.effective_user.id pool = await get_pool() row = await pool.fetchrow( @@ -29,6 +24,56 @@ async def _require_user(update: Update, context: ContextTypes.DEFAULT_TYPE) -> a return row +async def _require_admin(update: Update) -> asyncpg.Row | None: # type: ignore[name-defined] + """Require the sender to be a whitelisted admin.""" + telegram_id = update.effective_user.id + pool = await get_pool() + row = await pool.fetchrow( + "SELECT id, is_admin FROM users WHERE telegram_id = $1 AND is_active = true AND is_admin = true", + telegram_id, + ) + if not row: + await update.message.reply_text("Unauthorized — admin only.") # type: ignore[union-attr] + return None + return row + + +async def _auto_register(update: Update) -> asyncpg.Row | None: # type: ignore[name-defined] + """Create user row on first contact if not present. Returns nothing if un-whitelisted.""" + telegram_id = update.effective_user.id + username = update.effective_user.username or None + first_name = update.effective_user.first_name or None + pool = await get_pool() + + existing = await pool.fetchrow( + "SELECT id, is_active FROM users WHERE telegram_id = $1", + telegram_id, + ) + if existing: + # Update name info in case it changed + await pool.execute( + "UPDATE users SET username = $1, first_name = $2 WHERE telegram_id = $3", + username, + first_name, + telegram_id, + ) + if not existing["is_active"]: + await update.message.reply_text("Account deactivated. Contact an admin.") # type: ignore[union-attr] + return None + return existing + + user_uuid = str(uuid.uuid4()) + await pool.execute( + "INSERT INTO users (id, telegram_id, username, first_name) VALUES ($1, $2, $3, $4)", + user_uuid, + telegram_id, + username, + first_name, + ) + logger.info("Auto-registered user %s (%s)", telegram_id, first_name) + return asyncpg.Record(("id", user_uuid), ("is_active", True)) # type: ignore[call-arg] + + def register_handlers(bot: ExtBot) -> None: bot.add_handler(CommandHandler("start", start_handler)) bot.add_handler(CommandHandler("add", add_handler)) @@ -37,19 +82,19 @@ def register_handlers(bot: ExtBot) -> None: bot.add_handler(CommandHandler("resume", resume_handler)) bot.add_handler(CommandHandler("delete", delete_handler)) bot.add_handler(CommandHandler("stats", stats_handler)) + bot.add_handler(CommandHandler("adduser", adduser_handler)) + bot.add_handler(CommandHandler("removeuser", removeuser_handler)) bot.add_handler(CommandHandler("users", users_handler)) # -- /start --------------------------------------------------------------- - async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update, context) + row = await _auto_register(update) if not row: return - user = update.effective_user - name = user.first_name or f"user {user.id}" + name = update.effective_user.first_name or f"user {update.effective_user.id}" await update.message.reply_text( # type: ignore[union-attr] f"Hello {name}! I'll notify you about new willhaben listings.\n\n" "Available commands:\n" @@ -64,26 +109,21 @@ async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N # -- /add ----------------------------------------------------------------- - async def add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update, context) + row = await _require_user(update) if not row: return text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] - keyword = text.split(" ", 1) - if len(keyword) < 2 or not keyword[1].strip().strip("'\""): + parts = text.split(" ", 1) + if len(parts) < 2 or not parts[1].strip().strip("'\""): await update.message.reply_text("Usage: /add ") # type: ignore[union-attr] return - keyword = keyword[1].strip().strip("'\"") - if not keyword: - await update.message.reply_text("Please provide a non-empty keyword.") # type: ignore[union-attr] - return - + keyword = parts[1].strip().strip("'\"") pool = await get_pool() existing = await pool.fetchrow( - "SELECT id FROM search_queries WHERE user_id = $1 AND keyword = $2", + "SELECT id FROM search_queries WHERE user_id = $1 AND keyword ILIKE $2", row["id"], keyword, ) @@ -91,29 +131,25 @@ async def add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Non await update.message.reply_text(f"Query already exists: {keyword}") # type: ignore[union-attr] return - interval = int(os.getenv("DEFAULT_INTERVAL_MINUTES", "60")) query_id = str(uuid.uuid4()) await pool.execute( - "INSERT INTO search_queries (id, user_id, keyword, interval_minutes) VALUES ($1, $2, $3, $4)", + "INSERT INTO search_queries (id, user_id, keyword) VALUES ($1, $2, $3)", query_id, row["id"], keyword, - interval, ) await update.message.reply_text( # type: ignore[union-attr] - f"✓ Tracking \"{keyword}\"\n" - f"Query ID: {query_id}\n" - f"Check interval: every {interval} minutes", + f"Tracking \"{keyword}\"\n" + f"Query ID: {query_id}", ) logger.info("User %s added query '%s' (%s)", update.effective_user.id, keyword, query_id) # -- /list ---------------------------------------------------------------- - async def list_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update, context) + row = await _require_user(update) if not row: return @@ -144,9 +180,8 @@ async def list_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No # -- /pause --------------------------------------------------------------- - async def pause_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update, context) + row = await _require_user(update) if not row: return @@ -174,15 +209,13 @@ async def pause_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] return - await update.message.reply_text(f"✓ Query paused: {query_id}") # type: ignore[union-attr] - logger.info("User %s paused query %s", update.effective_user.id, query_id) + await update.message.reply_text(f"Query paused: {query_id}") # type: ignore[union-attr] # -- /resume -------------------------------------------------------------- - async def resume_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update, context) + row = await _require_user(update) if not row: return @@ -210,15 +243,13 @@ async def resume_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] return - await update.message.reply_text(f"✓ Query resumed: {query_id}") # type: ignore[union-attr] - logger.info("User %s resumed query %s", update.effective_user.id, query_id) + await update.message.reply_text(f"Query resumed: {query_id}") # type: ignore[union-attr] # -- /delete -------------------------------------------------------------- - async def delete_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update, context) + row = await _require_user(update) if not row: return @@ -246,15 +277,13 @@ async def delete_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr] return - await update.message.reply_text(f"✓ Query deleted: {query_id}") # type: ignore[union-attr] - logger.info("User %s deleted query %s", update.effective_user.id, query_id) + await update.message.reply_text(f"Query deleted: {query_id}") # type: ignore[union-attr] # -- /stats --------------------------------------------------------------- - async def stats_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - row = await _require_user(update, context) + row = await _require_user(update) if not row: return @@ -278,7 +307,7 @@ async def stats_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N ) text = ( - f"📊 Your stats:\n\n" + f"Your stats:\n\n" f"Queries: {total_queries}\n" f"Ads tracked: {total_ads}\n" f"Notifications sent: {total_notifications}" @@ -286,27 +315,101 @@ async def stats_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N await update.message.reply_text(text) # type: ignore[union-attr] -# -- /users (admin only) -------------------------------------------------- +# -- /adduser (admin only) ------------------------------------------------ + +async def adduser_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + admin_row = await _require_admin(update) + if not admin_row: + return + + text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] + parts = text.split() + if len(parts) < 2: + await update.message.reply_text("Usage: /adduser [admin]") # type: ignore[union-attr] + return + + try: + telegram_id = int(parts[1]) + except ValueError: + await update.message.reply_text("Invalid Telegram ID (must be numeric).") # type: ignore[union-attr] + return + + is_admin = parts[-1].lower() == "admin" if len(parts) >= 3 else False + pool = await get_pool() + + user_uuid = str(uuid.uuid4()) + await pool.execute( + "INSERT INTO users (id, telegram_id, is_admin) VALUES ($1, $2, $3) " + "ON CONFLICT (telegram_id) DO UPDATE SET is_admin = EXCLUDED.is_admin", + user_uuid, + telegram_id, + is_admin, + ) + + role = "admin" if is_admin else "user" + await update.message.reply_text( # type: ignore[union-attr] + f"Added Telegram ID {telegram_id} as {role}.", + ) + logger.info("Admin %s added user %d (is_admin=%s)", update.effective_user.id, telegram_id, is_admin) -async def users_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - if update.effective_user.id not in _admin_ids(): - await update.message.reply_text("Unauthorized") # type: ignore[union-attr] +# -- /removeuser (admin only) --------------------------------------------- + +async def removeuser_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + admin_row = await _require_admin(update) + if not admin_row: + return + + text = (update.message or update.callback_query).text or "" # type: ignore[union-attr] + parts = text.split() + if len(parts) < 2: + await update.message.reply_text("Usage: /removeuser ") # type: ignore[union-attr] + return + + try: + telegram_id = int(parts[1]) + except ValueError: + await update.message.reply_text("Invalid Telegram ID (must be numeric).") # type: ignore[union-attr] return pool = await get_pool() - users = await pool.fetch( - "SELECT telegram_id, username, first_name, is_active, created_at FROM users ORDER BY created_at DESC" + result = await pool.execute( + "DELETE FROM users WHERE telegram_id = $1", + telegram_id, ) - if not users: + if "0" in result: + await update.message.reply_text(f"No user found with Telegram ID {telegram_id}.") # type: ignore[union-attr] + else: + await update.message.reply_text( # type: ignore[union-attr] + f"Removed Telegram ID {telegram_id}.", + ) + + +# -- /users (admin only) -------------------------------------------------- + +async def users_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + admin_row = await _require_admin(update) + if not admin_row: + return + + pool = await get_pool() + users_list = await pool.fetch( + "SELECT telegram_id, username, first_name, is_admin, is_active, created_at " + "FROM users ORDER BY created_at DESC" + ) + + if not users_list: await update.message.reply_text("No users registered.") # type: ignore[union-attr] return lines = [] - for u in users: + for u in users_list: + role = "[admin]" if u["is_admin"] else "" status = "active" if u["is_active"] else "inactive" - name = u["username"] or u["first_name"] or "unknown" - lines.append(f"• {u['telegram_id']} | @{name} | {status}") + name = u["username"] or u["first_name"] or str(u["telegram_id"]) + lines.append(f"{u['telegram_id']} | {name} | {status} {role}") - await update.message.reply_text("👥 Registered users:\n\n" + "\n".join(lines)) # type: ignore[union-attr] + await update.message.reply_text( # type: ignore[union-attr] + "Registered users:\n\n" + "\n".join(lines), + )