feat: python worker (bot, scraper, notifier, scheduler)
This commit is contained in:
@@ -0,0 +1,6 @@
|
|||||||
|
FROM python:3.12-slim
|
||||||
|
WORKDIR /app
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
COPY src/ .
|
||||||
|
CMD ["python", "main.py"]
|
||||||
@@ -0,0 +1,4 @@
|
|||||||
|
python-telegram-bot==21.4
|
||||||
|
asyncpg==0.30.0
|
||||||
|
httpx==0.27.2
|
||||||
|
python-dotenv==1.0.1
|
||||||
@@ -0,0 +1,312 @@
|
|||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import asyncpg
|
||||||
|
from telegram import Update
|
||||||
|
from telegram.ext import ContextTypes, CommandHandler, ExtBot
|
||||||
|
|
||||||
|
from db import get_pool
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _admin_ids() -> set[int]:
|
||||||
|
raw = os.getenv("ADMIN_TELEGRAM_IDS", "")
|
||||||
|
return {int(x.strip()) for x in raw.split(",") if x.strip()}
|
||||||
|
|
||||||
|
|
||||||
|
async def _require_user(update: Update, context: ContextTypes.DEFAULT_TYPE) -> asyncpg.Row | None: # type: ignore[name-defined]
|
||||||
|
telegram_id = update.effective_user.id
|
||||||
|
pool = await get_pool()
|
||||||
|
row = await pool.fetchrow(
|
||||||
|
"SELECT id, is_active FROM users WHERE telegram_id = $1",
|
||||||
|
telegram_id,
|
||||||
|
)
|
||||||
|
if not row or not row["is_active"]:
|
||||||
|
await update.message.reply_text("Access denied. This bot requires an invitation.") # type: ignore[union-attr]
|
||||||
|
return None
|
||||||
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
def register_handlers(bot: ExtBot) -> None:
|
||||||
|
bot.add_handler(CommandHandler("start", start_handler))
|
||||||
|
bot.add_handler(CommandHandler("add", add_handler))
|
||||||
|
bot.add_handler(CommandHandler("list", list_handler))
|
||||||
|
bot.add_handler(CommandHandler("pause", pause_handler))
|
||||||
|
bot.add_handler(CommandHandler("resume", resume_handler))
|
||||||
|
bot.add_handler(CommandHandler("delete", delete_handler))
|
||||||
|
bot.add_handler(CommandHandler("stats", stats_handler))
|
||||||
|
bot.add_handler(CommandHandler("users", users_handler))
|
||||||
|
|
||||||
|
|
||||||
|
# -- /start ---------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
row = await _require_user(update, context)
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
|
||||||
|
user = update.effective_user
|
||||||
|
name = user.first_name or f"user {user.id}"
|
||||||
|
await update.message.reply_text( # type: ignore[union-attr]
|
||||||
|
f"Hello {name}! I'll notify you about new willhaben listings.\n\n"
|
||||||
|
"Available commands:\n"
|
||||||
|
"/add <keyword> — Start tracking a keyword\n"
|
||||||
|
"/list — List your active searches\n"
|
||||||
|
"/pause <query_id> — Pause a search\n"
|
||||||
|
"/resume <query_id> — Resume a paused search\n"
|
||||||
|
"/delete <query_id> — Delete a search\n"
|
||||||
|
"/stats — View your tracking statistics",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -- /add -----------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
row = await _require_user(update, context)
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
|
||||||
|
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
|
||||||
|
keyword = text.split(" ", 1)
|
||||||
|
if len(keyword) < 2 or not keyword[1].strip().strip("'\""):
|
||||||
|
await update.message.reply_text("Usage: /add <keyword>") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
keyword = keyword[1].strip().strip("'\"")
|
||||||
|
if not keyword:
|
||||||
|
await update.message.reply_text("Please provide a non-empty keyword.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
pool = await get_pool()
|
||||||
|
existing = await pool.fetchrow(
|
||||||
|
"SELECT id FROM search_queries WHERE user_id = $1 AND keyword = $2",
|
||||||
|
row["id"],
|
||||||
|
keyword,
|
||||||
|
)
|
||||||
|
if existing:
|
||||||
|
await update.message.reply_text(f"Query already exists: {keyword}") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
interval = int(os.getenv("DEFAULT_INTERVAL_MINUTES", "60"))
|
||||||
|
query_id = str(uuid.uuid4())
|
||||||
|
await pool.execute(
|
||||||
|
"INSERT INTO search_queries (id, user_id, keyword, interval_minutes) VALUES ($1, $2, $3, $4)",
|
||||||
|
query_id,
|
||||||
|
row["id"],
|
||||||
|
keyword,
|
||||||
|
interval,
|
||||||
|
)
|
||||||
|
|
||||||
|
await update.message.reply_text( # type: ignore[union-attr]
|
||||||
|
f"✓ Tracking \"{keyword}\"\n"
|
||||||
|
f"Query ID: {query_id}\n"
|
||||||
|
f"Check interval: every {interval} minutes",
|
||||||
|
)
|
||||||
|
logger.info("User %s added query '%s' (%s)", update.effective_user.id, keyword, query_id)
|
||||||
|
|
||||||
|
|
||||||
|
# -- /list ----------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def list_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
row = await _require_user(update, context)
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
|
||||||
|
pool = await get_pool()
|
||||||
|
queries = await pool.fetch(
|
||||||
|
"SELECT id, keyword, interval_minutes, is_active, last_scraped_at "
|
||||||
|
"FROM search_queries WHERE user_id = $1 ORDER BY created_at DESC",
|
||||||
|
row["id"],
|
||||||
|
)
|
||||||
|
|
||||||
|
if not queries:
|
||||||
|
await update.message.reply_text("No active searches. Use /add <keyword> to start.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
for i, q in enumerate(queries, 1):
|
||||||
|
status = "active" if q["is_active"] else "paused"
|
||||||
|
last = q["last_scraped_at"].strftime("%d.%m.%Y %H:%M") if q["last_scraped_at"] else "never"
|
||||||
|
lines.append(
|
||||||
|
f"{i}. \"{q['keyword']}\"\n"
|
||||||
|
f" ID: {q['id']}\n"
|
||||||
|
f" Interval: {q['interval_minutes']} min | Status: {status}\n"
|
||||||
|
f" Last scraped: {last}"
|
||||||
|
)
|
||||||
|
|
||||||
|
await update.message.reply_text("\n\n".join(lines)) # type: ignore[union-attr]
|
||||||
|
|
||||||
|
|
||||||
|
# -- /pause ---------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def pause_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
row = await _require_user(update, context)
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
|
||||||
|
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
|
||||||
|
parts = text.split()
|
||||||
|
if len(parts) < 2:
|
||||||
|
await update.message.reply_text("Usage: /pause <query_id>") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
query_id = parts[1].strip()
|
||||||
|
try:
|
||||||
|
uuid.UUID(query_id)
|
||||||
|
except ValueError:
|
||||||
|
await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
pool = await get_pool()
|
||||||
|
result = await pool.execute(
|
||||||
|
"UPDATE search_queries SET is_active = false WHERE id = $1 AND user_id = $2",
|
||||||
|
query_id,
|
||||||
|
row["id"],
|
||||||
|
)
|
||||||
|
|
||||||
|
if "1" not in result:
|
||||||
|
await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
await update.message.reply_text(f"✓ Query paused: {query_id}") # type: ignore[union-attr]
|
||||||
|
logger.info("User %s paused query %s", update.effective_user.id, query_id)
|
||||||
|
|
||||||
|
|
||||||
|
# -- /resume --------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def resume_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
row = await _require_user(update, context)
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
|
||||||
|
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
|
||||||
|
parts = text.split()
|
||||||
|
if len(parts) < 2:
|
||||||
|
await update.message.reply_text("Usage: /resume <query_id>") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
query_id = parts[1].strip()
|
||||||
|
try:
|
||||||
|
uuid.UUID(query_id)
|
||||||
|
except ValueError:
|
||||||
|
await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
pool = await get_pool()
|
||||||
|
result = await pool.execute(
|
||||||
|
"UPDATE search_queries SET is_active = true WHERE id = $1 AND user_id = $2",
|
||||||
|
query_id,
|
||||||
|
row["id"],
|
||||||
|
)
|
||||||
|
|
||||||
|
if "1" not in result:
|
||||||
|
await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
await update.message.reply_text(f"✓ Query resumed: {query_id}") # type: ignore[union-attr]
|
||||||
|
logger.info("User %s resumed query %s", update.effective_user.id, query_id)
|
||||||
|
|
||||||
|
|
||||||
|
# -- /delete --------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def delete_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
row = await _require_user(update, context)
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
|
||||||
|
text = (update.message or update.callback_query).text or "" # type: ignore[union-attr]
|
||||||
|
parts = text.split()
|
||||||
|
if len(parts) < 2:
|
||||||
|
await update.message.reply_text("Usage: /delete <query_id>") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
query_id = parts[1].strip()
|
||||||
|
try:
|
||||||
|
uuid.UUID(query_id)
|
||||||
|
except ValueError:
|
||||||
|
await update.message.reply_text("Invalid query ID format.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
pool = await get_pool()
|
||||||
|
result = await pool.execute(
|
||||||
|
"DELETE FROM search_queries WHERE id = $1 AND user_id = $2",
|
||||||
|
query_id,
|
||||||
|
row["id"],
|
||||||
|
)
|
||||||
|
|
||||||
|
if "1" not in result:
|
||||||
|
await update.message.reply_text("Query not found or access denied.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
await update.message.reply_text(f"✓ Query deleted: {query_id}") # type: ignore[union-attr]
|
||||||
|
logger.info("User %s deleted query %s", update.effective_user.id, query_id)
|
||||||
|
|
||||||
|
|
||||||
|
# -- /stats ---------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def stats_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
row = await _require_user(update, context)
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
|
||||||
|
pool = await get_pool()
|
||||||
|
|
||||||
|
total_queries = await pool.fetchval(
|
||||||
|
"SELECT COUNT(*) FROM search_queries WHERE user_id = $1",
|
||||||
|
row["id"],
|
||||||
|
)
|
||||||
|
|
||||||
|
total_ads = await pool.fetchval(
|
||||||
|
"SELECT COUNT(DISTINCT qa.ad_id) "
|
||||||
|
"FROM query_ads qa JOIN search_queries sq ON qa.search_query_id = sq.id "
|
||||||
|
"WHERE sq.user_id = $1",
|
||||||
|
row["id"],
|
||||||
|
)
|
||||||
|
|
||||||
|
total_notifications = await pool.fetchval(
|
||||||
|
"SELECT COUNT(*) FROM notifications WHERE user_id = $1",
|
||||||
|
row["id"],
|
||||||
|
)
|
||||||
|
|
||||||
|
text = (
|
||||||
|
f"📊 Your stats:\n\n"
|
||||||
|
f"Queries: {total_queries}\n"
|
||||||
|
f"Ads tracked: {total_ads}\n"
|
||||||
|
f"Notifications sent: {total_notifications}"
|
||||||
|
)
|
||||||
|
await update.message.reply_text(text) # type: ignore[union-attr]
|
||||||
|
|
||||||
|
|
||||||
|
# -- /users (admin only) --------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def users_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
if update.effective_user.id not in _admin_ids():
|
||||||
|
await update.message.reply_text("Unauthorized") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
pool = await get_pool()
|
||||||
|
users = await pool.fetch(
|
||||||
|
"SELECT telegram_id, username, first_name, is_active, created_at FROM users ORDER BY created_at DESC"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not users:
|
||||||
|
await update.message.reply_text("No users registered.") # type: ignore[union-attr]
|
||||||
|
return
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
for u in users:
|
||||||
|
status = "active" if u["is_active"] else "inactive"
|
||||||
|
name = u["username"] or u["first_name"] or "unknown"
|
||||||
|
lines.append(f"• {u['telegram_id']} | @{name} | {status}")
|
||||||
|
|
||||||
|
await update.message.reply_text("👥 Registered users:\n\n" + "\n".join(lines)) # type: ignore[union-attr]
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import asyncpg
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_pool: asyncpg.Pool | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def get_pool() -> asyncpg.Pool:
|
||||||
|
global _pool
|
||||||
|
if _pool is None:
|
||||||
|
_pool = await asyncpg.create_pool(
|
||||||
|
host=os.getenv("POSTGRES_HOST", "db"),
|
||||||
|
port=int(os.getenv("POSTGRES_PORT", "5432")),
|
||||||
|
user=os.getenv("POSTGRES_USER", "postgres"),
|
||||||
|
password=os.getenv("POSTGRES_PASSWORD"),
|
||||||
|
database=os.getenv("POSTGRES_DB", "postgres"),
|
||||||
|
min_size=2,
|
||||||
|
max_size=10,
|
||||||
|
)
|
||||||
|
logger.info("Database pool initialized")
|
||||||
|
return _pool
|
||||||
|
|
||||||
|
|
||||||
|
async def close_pool() -> None:
|
||||||
|
global _pool
|
||||||
|
if _pool is not None:
|
||||||
|
await _pool.close()
|
||||||
|
logger.info("Database pool closed")
|
||||||
|
_pool = None
|
||||||
@@ -0,0 +1,202 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
from contextlib import suppress
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from telegram import Update
|
||||||
|
from telegram.ext import Application, ExtBot
|
||||||
|
|
||||||
|
from db import close_pool, get_pool
|
||||||
|
from scraper import extract_ad_fields, fetch_ads
|
||||||
|
from notifier import log_notification, mark_notified, notify_new_ad
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
|
async def scheduler_task(pool: object, bot: ExtBot) -> None:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
rows = await pool.fetch(
|
||||||
|
"""
|
||||||
|
SELECT sq.id, sq.keyword, sq.interval_minutes, u.telegram_id
|
||||||
|
FROM search_queries sq
|
||||||
|
JOIN users u ON sq.user_id = u.id
|
||||||
|
WHERE sq.is_active = true
|
||||||
|
AND (sq.last_scraped_at IS NULL OR
|
||||||
|
sq.last_scraped_at < now() - (sq.interval_minutes || ' minutes')::interval)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
query_id = str(row["id"])
|
||||||
|
keyword = row["keyword"]
|
||||||
|
telegram_id = row["telegram_id"]
|
||||||
|
|
||||||
|
logger.info("Scraping keyword '%s' for query %s", keyword, query_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
ads_raw, total_hits = await fetch_ads(keyword)
|
||||||
|
new_count = 0
|
||||||
|
|
||||||
|
for ad_data in ads_raw:
|
||||||
|
fields = extract_ad_fields(ad_data)
|
||||||
|
wh_ad_id = fields["wh_ad_id"]
|
||||||
|
|
||||||
|
existing = await pool.fetchrow(
|
||||||
|
"SELECT id FROM ads WHERE wh_ad_id = $1",
|
||||||
|
wh_ad_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not existing:
|
||||||
|
ad_row = await pool.fetchrow(
|
||||||
|
"""
|
||||||
|
INSERT INTO ads (wh_ad_id, raw_json, title, price, location, url, published_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
wh_ad_id,
|
||||||
|
json.dumps(ad_data),
|
||||||
|
fields["title"],
|
||||||
|
fields["price"],
|
||||||
|
fields["location"],
|
||||||
|
fields["url"],
|
||||||
|
fields.get("published_at"),
|
||||||
|
)
|
||||||
|
ad_uuid = str(ad_row["id"])
|
||||||
|
else:
|
||||||
|
ad_uuid = str(existing["id"])
|
||||||
|
|
||||||
|
existing_qa = await pool.fetchrow(
|
||||||
|
"SELECT 1 FROM query_ads WHERE search_query_id = $1 AND ad_id = $2",
|
||||||
|
query_id,
|
||||||
|
ad_uuid,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not existing_qa:
|
||||||
|
await pool.execute(
|
||||||
|
"INSERT INTO query_ads (search_query_id, ad_id) VALUES ($1, $2)",
|
||||||
|
query_id,
|
||||||
|
ad_uuid,
|
||||||
|
)
|
||||||
|
|
||||||
|
user_row = await pool.fetchrow(
|
||||||
|
"SELECT id FROM users WHERE telegram_id = $1",
|
||||||
|
telegram_id,
|
||||||
|
)
|
||||||
|
user_id = str(user_row["id"]) if user_row else None
|
||||||
|
|
||||||
|
notify_fields = {**fields, "keyword": keyword}
|
||||||
|
await notify_new_ad(bot, telegram_id, notify_fields)
|
||||||
|
|
||||||
|
if user_id:
|
||||||
|
await mark_notified(pool, query_id, ad_uuid)
|
||||||
|
try:
|
||||||
|
msg_id = 0
|
||||||
|
await log_notification(pool, user_id, ad_uuid, msg_id)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to log notification")
|
||||||
|
|
||||||
|
new_count += 1
|
||||||
|
logger.info(
|
||||||
|
"New ad %s found for query %s (keyword=%s)",
|
||||||
|
wh_ad_id,
|
||||||
|
query_id,
|
||||||
|
keyword,
|
||||||
|
)
|
||||||
|
|
||||||
|
await pool.execute(
|
||||||
|
"UPDATE search_queries SET last_scraped_at = now() WHERE id = $1",
|
||||||
|
query_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
await pool.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO scrape_logs (search_query_id, status, ads_found, new_ads)
|
||||||
|
VALUES ($1, 'success', $2, $3)
|
||||||
|
""",
|
||||||
|
query_id,
|
||||||
|
len(ads_raw),
|
||||||
|
new_count,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error scraping keyword '%s' (query %s)", keyword, query_id)
|
||||||
|
await pool.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO scrape_logs (search_query_id, status, error_message)
|
||||||
|
VALUES ($1, 'error', $2)
|
||||||
|
""",
|
||||||
|
query_id,
|
||||||
|
str(sys.exc_info()[1]),
|
||||||
|
)
|
||||||
|
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Scheduler iteration error")
|
||||||
|
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||||||
|
)
|
||||||
|
|
||||||
|
if not os.getenv("TELEGRAM_BOT_TOKEN"):
|
||||||
|
logger.error("TELEGRAM_BOT_TOKEN is required")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
pool = await get_pool()
|
||||||
|
|
||||||
|
app = Application.builder().token(os.getenv("TELEGRAM_BOT_TOKEN")).build()
|
||||||
|
|
||||||
|
from bot import register_handlers # noqa: E402
|
||||||
|
|
||||||
|
register_handlers(app.bot)
|
||||||
|
|
||||||
|
scheduler = asyncio.ensure_future(scheduler_task(pool, app.bot))
|
||||||
|
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
stop = loop.create_future()
|
||||||
|
|
||||||
|
def _signal_handler() -> None:
|
||||||
|
if not stop.done():
|
||||||
|
stop.set_result(True)
|
||||||
|
|
||||||
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||||
|
loop.add_signal_handler(sig, _signal_handler)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await app.initialize()
|
||||||
|
await app.start()
|
||||||
|
logger.info("Bot started with long polling")
|
||||||
|
|
||||||
|
poll_task = asyncio.ensure_future(app.updater.start_polling()) # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
await stop
|
||||||
|
logger.info("Shutting down...")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
scheduler.cancel()
|
||||||
|
with suppress(asyncio.CancelledError):
|
||||||
|
await scheduler
|
||||||
|
|
||||||
|
poll_task.cancel()
|
||||||
|
with suppress(asyncio.CancelledError):
|
||||||
|
await poll_task
|
||||||
|
|
||||||
|
await app.shutdown()
|
||||||
|
await close_pool()
|
||||||
|
logger.info("Shutdown complete")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
@@ -0,0 +1,78 @@
|
|||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import asyncpg
|
||||||
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
||||||
|
from telegram.ext import ExtBot
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def notify_new_ad(
|
||||||
|
bot: ExtBot,
|
||||||
|
telegram_id: int,
|
||||||
|
ad: dict,
|
||||||
|
) -> None:
|
||||||
|
price_str = f"{ad['price']:,.0f}" if ad.get("price") is not None else "N/A"
|
||||||
|
|
||||||
|
published_str = ""
|
||||||
|
if ad.get("published_at"):
|
||||||
|
try:
|
||||||
|
dt = datetime.fromisoformat(ad["published_at"])
|
||||||
|
published_str = f"Published: {dt:%d.%m.%Y %H:%M}"
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
published_str = f"Published: {ad['published_at']}"
|
||||||
|
|
||||||
|
location_line = f" | 📍 {ad['location']}" if ad.get("location") else ""
|
||||||
|
|
||||||
|
text = (
|
||||||
|
f"🔍 New listing found!\n\n"
|
||||||
|
f"{ad.get('title', 'Untitled')}\n"
|
||||||
|
f"💰 {price_str} €{location_line}\n"
|
||||||
|
)
|
||||||
|
if published_str:
|
||||||
|
text += f"{published_str}\n"
|
||||||
|
|
||||||
|
keyboard = []
|
||||||
|
if ad.get("url"):
|
||||||
|
keyboard.append(
|
||||||
|
[InlineKeyboardButton("View Ad", url=ad["url"])]
|
||||||
|
)
|
||||||
|
|
||||||
|
reply_markup: InlineKeyboardMarkup | None = (
|
||||||
|
InlineKeyboardMarkup(keyboard) if keyboard else None
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
message = await bot.send_message(
|
||||||
|
chat_id=telegram_id,
|
||||||
|
text=text,
|
||||||
|
reply_markup=reply_markup,
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Sent notification to %s for ad %s (msg_id=%s)",
|
||||||
|
telegram_id,
|
||||||
|
ad.get("wh_ad_id"),
|
||||||
|
message.message_id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to send Telegram notification")
|
||||||
|
|
||||||
|
|
||||||
|
async def mark_notified(pool: asyncpg.Pool, search_query_id: str, ad_id: str) -> None:
|
||||||
|
await pool.execute(
|
||||||
|
"UPDATE query_ads SET is_notified = true WHERE search_query_id = $1 AND ad_id = $2",
|
||||||
|
search_query_id,
|
||||||
|
ad_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def log_notification(
|
||||||
|
pool: asyncpg.Pool, user_id: str, ad_id: str, message_id: int
|
||||||
|
) -> None:
|
||||||
|
await pool.execute(
|
||||||
|
"INSERT INTO notifications (user_id, ad_id, message_id) VALUES ($1, $2, $3)",
|
||||||
|
user_id,
|
||||||
|
ad_id,
|
||||||
|
message_id,
|
||||||
|
)
|
||||||
@@ -0,0 +1,96 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any
|
||||||
|
from urllib.parse import quote_plus
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_API_URL = (
|
||||||
|
"https://www.willhaben.at/webapi/ad-search/search/atz/seo/"
|
||||||
|
"kaufen-und-verkaufen/marktplatz"
|
||||||
|
)
|
||||||
|
|
||||||
|
_HEADERS = {
|
||||||
|
"accept": "application/json",
|
||||||
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
|
||||||
|
"x-wh-client": "api@willhaben.at;responsive_web;server;1.0.0;desktop",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_ads(keyword: str) -> tuple[list[dict[str, Any]], int]:
|
||||||
|
params = {
|
||||||
|
"keyword": keyword,
|
||||||
|
"rows": 30,
|
||||||
|
"sort": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||||
|
for attempt in range(1, 4):
|
||||||
|
try:
|
||||||
|
resp = await client.get(_API_URL, headers=_HEADERS, params=params)
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = resp.json()
|
||||||
|
break
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("fetch_ads attempt %d failed: %s", attempt, exc)
|
||||||
|
if attempt < 3:
|
||||||
|
await asyncio.sleep(2 ** attempt)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
|
||||||
|
ads_raw = data.get("advertSummaryList", {}).get("advertSummary", [])
|
||||||
|
total_hits = int(data.get("rowsFound", 0))
|
||||||
|
return ads_raw, total_hits
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_attributes(ad_dict: dict[str, Any]) -> dict[str, str]:
|
||||||
|
attr_list = ad_dict.get("attributes", {}).get("attribute", [])
|
||||||
|
result: dict[str, str] = {}
|
||||||
|
for attr in attr_list:
|
||||||
|
name = attr.get("name")
|
||||||
|
values = attr.get("values", [])
|
||||||
|
if name and values:
|
||||||
|
result[name] = values[0]
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def extract_ad_fields(ad_dict: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
attrs = _parse_attributes(ad_dict)
|
||||||
|
|
||||||
|
title = attrs.get("HEADING") or ad_dict.get("description", "")
|
||||||
|
|
||||||
|
# Price from "PRICE/AMOUNT" attribute (API format)
|
||||||
|
price_raw = attrs.get("PRICE/AMOUNT")
|
||||||
|
price: float | None = None
|
||||||
|
if price_raw is not None:
|
||||||
|
try:
|
||||||
|
price = float(str(price_raw).replace(",", ""))
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
location = attrs.get("LOCATION")
|
||||||
|
|
||||||
|
seo_url = attrs.get("SEO_URL")
|
||||||
|
url = f"https://www.willhaben.at/iad/{seo_url}" if seo_url else None
|
||||||
|
|
||||||
|
# Published time from CHANGED_String or PUBLISHED_String (ISO 8601)
|
||||||
|
published_raw = attrs.get("PUBLISHED_String") or attrs.get("CHANGED_String")
|
||||||
|
published_at: str | None = None
|
||||||
|
if published_raw:
|
||||||
|
try:
|
||||||
|
dt = datetime.fromisoformat(published_raw.replace("Z", "+00:00"))
|
||||||
|
published_at = dt.isoformat()
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
return {
|
||||||
|
"wh_ad_id": str(ad_dict.get("id", "")),
|
||||||
|
"title": title or "",
|
||||||
|
"price": price,
|
||||||
|
"location": location,
|
||||||
|
"url": url,
|
||||||
|
"published_at": published_at,
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user