fix: remove stale mark_notified import, full main.py scheduler refactor, fix scraper datetime+image extraction

This commit is contained in:
2026-06-17 08:27:34 +02:00
parent b93811bb1b
commit a21c310eeb
5 changed files with 752 additions and 120 deletions
+73 -78
View File
@@ -12,7 +12,7 @@ from telegram.ext import Application, ExtBot
from db import close_pool, get_pool
from scraper import extract_ad_fields, fetch_ads
from notifier import log_notification, mark_notified, notify_new_ad
from notifier import log_notification, notify_new_ad, notify_price_drop
logger = logging.getLogger(__name__)
@@ -23,117 +23,112 @@ async def scheduler_task(pool: object, bot: ExtBot) -> None:
while True:
try:
rows = await pool.fetch(
"""
SELECT sq.id, sq.keyword, sq.interval_minutes, u.telegram_id
FROM search_queries sq
JOIN users u ON sq.user_id = u.id
WHERE sq.is_active = true
AND (sq.last_scraped_at IS NULL OR
sq.last_scraped_at < now() - (sq.interval_minutes || ' minutes')::interval)
"""
"SELECT id, keyword, interval_minutes, initial_loaded FROM keywords "
"WHERE is_active = true "
"AND (last_scraped_at IS NULL OR last_scraped_at < now() - (interval_minutes || ' minutes')::interval)"
)
for row in rows:
query_id = str(row["id"])
kw_id = str(row["id"])
keyword = row["keyword"]
telegram_id = row["telegram_id"]
initial_loaded = row["initial_loaded"]
logger.info("Scraping keyword '%s' for query %s", keyword, query_id)
subs = await pool.fetch(
"SELECT telegram_id FROM users u JOIN keyword_subscriptions ks ON u.id = ks.user_id "
"WHERE ks.keyword_id = $1 AND u.is_active = true",
kw_id,
)
if not subs:
await pool.execute("UPDATE keywords SET is_active = false WHERE id = $1", kw_id)
continue
telegram_ids = [sub["telegram_id"] for sub in subs]
logger.info("Scraping keyword '%s' (%d subscriber(s))", keyword, len(telegram_ids))
try:
ads_raw, total_hits = await fetch_ads(keyword)
new_count = 0
if not initial_loaded and len(ads_raw) > 0:
logger.info("Initial baseline load for '%s' — indexing %d ads, no notifications", keyword, len(ads_raw))
for ad_data in ads_raw:
fields = extract_ad_fields(ad_data)
wh_ad_id = fields["wh_ad_id"]
is_price_drop = False
old_price = None
new_price = None
existing = await pool.fetchrow(
"SELECT id FROM ads WHERE wh_ad_id = $1",
"SELECT id, price FROM ads WHERE wh_ad_id = $1",
wh_ad_id,
)
if not existing:
ad_row = await pool.fetchrow(
"""
INSERT INTO ads (wh_ad_id, raw_json, title, price, location, url, published_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id
""",
wh_ad_id,
json.dumps(ad_data),
fields["title"],
fields["price"],
fields["location"],
fields["url"],
fields.get("published_at"),
"INSERT INTO ads (wh_ad_id, raw_json, title, price, location, url, published_at, main_image_url, postcode, modified_at) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id",
wh_ad_id, json.dumps(ad_data), fields["title"], fields["price"],
fields["location"], fields["url"], fields.get("published_at"),
fields.get("main_image_url"), fields.get("postcode"), fields.get("modified_at"),
)
ad_uuid = str(ad_row["id"])
else:
ad_uuid = str(existing["id"])
old_price = existing["price"]
new_price = fields["price"]
existing_qa = await pool.fetchrow(
"SELECT 1 FROM query_ads WHERE search_query_id = $1 AND ad_id = $2",
query_id,
ad_uuid,
)
if not existing_qa:
await pool.execute(
"INSERT INTO query_ads (search_query_id, ad_id) VALUES ($1, $2)",
query_id,
ad_uuid,
)
user_row = await pool.fetchrow(
"SELECT id FROM users WHERE telegram_id = $1",
telegram_id,
)
user_id = str(user_row["id"]) if user_row else None
if old_price is not None and new_price is not None and new_price < old_price:
await pool.execute(
"UPDATE ads SET price = $1, main_image_url = $2, postcode = $3, modified_at = $4 WHERE id = $5",
new_price, fields.get("main_image_url"), fields.get("postcode"), fields.get("modified_at"), ad_uuid,
)
await pool.execute(
"INSERT INTO price_history (ad_id, old_price, new_price) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
ad_uuid, old_price, new_price,
)
is_price_drop = True
else:
if fields.get("main_image_url") or fields.get("postcode"):
await pool.execute(
"UPDATE ads SET main_image_url = COALESCE($1, main_image_url), postcode = COALESCE($2, postcode) WHERE id = $3 AND (main_image_url IS NULL OR postcode IS NULL)",
fields.get("main_image_url"), fields.get("postcode"), ad_uuid,
)
if not initial_loaded:
notify_fields = {**fields, "keyword": keyword}
await notify_new_ad(bot, telegram_id, notify_fields)
if user_id:
await mark_notified(pool, query_id, ad_uuid)
try:
msg_id = 0
await log_notification(pool, user_id, ad_uuid, msg_id)
except Exception:
logger.exception("Failed to log notification")
for tg_id in telegram_ids:
await notify_new_ad(bot, tg_id, notify_fields)
new_count += 1
logger.info(
"New ad %s found for query %s (keyword=%s)",
wh_ad_id,
query_id,
keyword,
)
if is_price_drop:
notify_fields = {**fields, "keyword": keyword}
for tg_id in telegram_ids:
msg_id_val = await notify_price_drop(bot, tg_id, notify_fields)
if msg_id_val:
user_row = await pool.fetchrow("SELECT id FROM users WHERE telegram_id = $1", tg_id)
if user_row:
try:
await log_notification(pool, str(user_row["id"]), ad_uuid, msg_id_val)
except Exception:
logger.exception("Failed to log price drop notification")
if not initial_loaded:
await pool.execute("UPDATE keywords SET initial_loaded = true WHERE id = $1", kw_id)
await pool.execute("UPDATE keywords SET last_scraped_at = now() WHERE id = $1", kw_id)
await pool.execute(
"UPDATE search_queries SET last_scraped_at = now() WHERE id = $1",
query_id,
)
await pool.execute(
"""
INSERT INTO scrape_logs (search_query_id, status, ads_found, new_ads)
VALUES ($1, 'success', $2, $3)
""",
query_id,
len(ads_raw),
new_count,
"INSERT INTO scrape_logs (keyword_id, status, ads_found, new_ads) VALUES ($1, 'success', $2, $3)",
kw_id, len(ads_raw), new_count,
)
except Exception:
logger.exception("Error scraping keyword '%s' (query %s)", keyword, query_id)
logger.exception("Error scraping keyword '%s' (%s)", keyword, kw_id)
await pool.execute(
"""
INSERT INTO scrape_logs (search_query_id, status, error_message)
VALUES ($1, 'error', $2)
""",
query_id,
str(sys.exc_info()[1]),
"INSERT INTO scrape_logs (keyword_id, status, error_message) VALUES ($1, 'error', $2)",
kw_id, str(sys.exc_info()[1]),
)
await asyncio.sleep(5)
+21 -3
View File
@@ -78,11 +78,26 @@ def extract_ad_fields(ad_dict: dict[str, Any]) -> dict[str, Any]:
# Published time from CHANGED_String or PUBLISHED_String (ISO 8601)
published_raw = attrs.get("PUBLISHED_String") or attrs.get("CHANGED_String")
published_at: str | None = None
published_at: datetime | None = None
if published_raw:
try:
dt = datetime.fromisoformat(published_raw.replace("Z", "+00:00"))
published_at = dt.isoformat()
published_at = datetime.fromisoformat(published_raw.replace("Z", "+00:00"))
except (ValueError, TypeError):
pass
# Main image from the first advertImage entry
images = ad_dict.get("advertImageList", {}).get("advertImage", [])
main_image_url: str | None = None
if images and isinstance(images[0], dict):
main_image_url = images[0].get("referenceImageUrl")
postcode = attrs.get("POSTCODE")
modified_raw = attrs.get("CHANGED_String")
modified_at: datetime | None = None
if modified_raw:
try:
modified_at = datetime.fromisoformat(modified_raw.replace("Z", "+00:00"))
except (ValueError, TypeError):
pass
@@ -93,4 +108,7 @@ def extract_ad_fields(ad_dict: dict[str, Any]) -> dict[str, Any]:
"location": location,
"url": url,
"published_at": published_at,
"main_image_url": main_image_url,
"postcode": postcode,
"modified_at": modified_at,
}