156 lines
4.4 KiB
Python
156 lines
4.4 KiB
Python
import logging
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
|
from telegram.ext import ExtBot
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _build_keyboard(ad: dict[str, Any]) -> InlineKeyboardMarkup | None:
|
|
keyboard: list[list[InlineKeyboardButton]] = []
|
|
if ad.get("url"):
|
|
keyboard.append(
|
|
[InlineKeyboardButton("View Ad →", url=ad["url"])]
|
|
)
|
|
return InlineKeyboardMarkup(keyboard) if keyboard else None
|
|
|
|
|
|
def _format_text(header: str, ad: dict[str, Any]) -> str:
|
|
title = ad.get("title", "Untitled")
|
|
|
|
price_str = f"{ad['price']:,.0f}" if ad.get("price") is not None else "N/A"
|
|
|
|
location_part = ""
|
|
if ad.get("location"):
|
|
if ad.get("postcode"):
|
|
location_part = f" 📍 {ad['location']}, {ad['postcode']}"
|
|
else:
|
|
location_part = f" 📍 {ad['location']}"
|
|
|
|
published_str = ""
|
|
if ad.get("published_at"):
|
|
pub = ad["published_at"]
|
|
if isinstance(pub, datetime):
|
|
published_str = f"{pub.strftime('%d.%m.%Y %H:%M')}"
|
|
else:
|
|
try:
|
|
dt = datetime.fromisoformat(str(pub))
|
|
published_str = f"{dt.strftime('%d.%m.%Y %H:%M')}"
|
|
except (ValueError, TypeError):
|
|
published_str = str(pub)
|
|
|
|
modified_str = ""
|
|
if ad.get("modified_at"):
|
|
mod = ad["modified_at"]
|
|
if isinstance(mod, datetime):
|
|
modified_str = f"{mod.strftime('%d.%m.%Y %H:%M')}"
|
|
else:
|
|
try:
|
|
dt = datetime.fromisoformat(str(mod))
|
|
modified_str = f"{dt.strftime('%d.%m.%Y %H:%M')}"
|
|
except (ValueError, TypeError):
|
|
modified_str = str(mod)
|
|
|
|
text_lines = [
|
|
header,
|
|
"",
|
|
f"<b>{title}</b>",
|
|
"",
|
|
f"💰 {price_str} €{location_part}",
|
|
]
|
|
if published_str:
|
|
pub_line = f"Published: {published_str}"
|
|
if modified_str:
|
|
pub_line += f" | Modified: {modified_str}"
|
|
text_lines.append(pub_line)
|
|
|
|
return "\n".join(text_lines)
|
|
|
|
|
|
async def notify_new_ad(
|
|
bot: ExtBot,
|
|
telegram_id: int,
|
|
ad: dict[str, Any],
|
|
) -> int | None:
|
|
text = _format_text("🆕 New listing found!", ad)
|
|
reply_markup = _build_keyboard(ad)
|
|
|
|
try:
|
|
if ad.get("main_image_url"):
|
|
message = await bot.send_photo(
|
|
chat_id=telegram_id,
|
|
photo=ad["main_image_url"],
|
|
caption=text,
|
|
parse_mode="HTML",
|
|
reply_markup=reply_markup,
|
|
)
|
|
else:
|
|
message = await bot.send_message(
|
|
chat_id=telegram_id,
|
|
text=text,
|
|
parse_mode="HTML",
|
|
reply_markup=reply_markup,
|
|
)
|
|
|
|
logger.info(
|
|
"Sent notification to %s for ad %s (msg_id=%s)",
|
|
telegram_id,
|
|
ad.get("wh_ad_id"),
|
|
message.message_id,
|
|
)
|
|
return message.message_id
|
|
except Exception:
|
|
logger.exception("Failed to send Telegram notification")
|
|
return None
|
|
|
|
|
|
async def notify_price_drop(
|
|
bot: ExtBot,
|
|
telegram_id: int,
|
|
ad: dict[str, Any],
|
|
) -> int | None:
|
|
text = _format_text("⚠️ Price drop!", ad)
|
|
reply_markup = _build_keyboard(ad)
|
|
|
|
try:
|
|
if ad.get("main_image_url"):
|
|
message = await bot.send_photo(
|
|
chat_id=telegram_id,
|
|
photo=ad["main_image_url"],
|
|
caption=text,
|
|
parse_mode="HTML",
|
|
reply_markup=reply_markup,
|
|
)
|
|
else:
|
|
message = await bot.send_message(
|
|
chat_id=telegram_id,
|
|
text=text,
|
|
parse_mode="HTML",
|
|
reply_markup=reply_markup,
|
|
)
|
|
|
|
logger.info(
|
|
"Sent price drop to %s for ad %s (msg_id=%s)",
|
|
telegram_id,
|
|
ad.get("wh_ad_id"),
|
|
message.message_id,
|
|
)
|
|
return message.message_id
|
|
except Exception:
|
|
logger.exception("Failed to send Telegram notification")
|
|
return None
|
|
|
|
|
|
async def log_notification(
|
|
pool: asyncpg.Pool, user_id: str, ad_id: str, message_id: int
|
|
) -> None:
|
|
await pool.execute(
|
|
"INSERT INTO notifications (user_id, ad_id, message_id) VALUES ($1, $2, $3)",
|
|
user_id,
|
|
ad_id,
|
|
message_id,
|
|
)
|