init
This commit is contained in:
Generated
+633
-36
File diff suppressed because it is too large
Load Diff
+5
-2
@@ -12,7 +12,7 @@ tower-sessions = "0.14"
|
||||
tower-sessions-sqlx-store = { version = "0.15", features = ["postgres"] }
|
||||
|
||||
sqlx = { version = "0.8", default-features = false, features = [
|
||||
"runtime-tokio", "tls-rustls", "postgres", "uuid", "time", "macros",
|
||||
"runtime-tokio", "tls-rustls", "postgres", "uuid", "time", "macros", "rust_decimal",
|
||||
] }
|
||||
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
@@ -32,6 +32,9 @@ thiserror = "2"
|
||||
anyhow = "1"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
time = { version = "0.3", features = ["serde"] }
|
||||
time = { version = "0.3", features = ["serde", "serde-well-known"] }
|
||||
uuid = { version = "1", features = ["v4", "serde"] }
|
||||
rust_decimal = { version = "1", features = ["serde-float"] }
|
||||
dotenvy = "0.15"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "gzip"] }
|
||||
url = "2"
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
-- Phase 2: topic-based wantlists + items
|
||||
-- A "list" is a topic (clothes, gear, …). An "item" is a thing the user covets,
|
||||
-- usually backed by a pasted product URL. Price/metadata columns are filled by the
|
||||
-- Phase 3 refetch worker (generic Shopify .json adapter, etc.) and stay NULL until then.
|
||||
|
||||
CREATE TABLE lists (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
name TEXT NOT NULL,
|
||||
emoji TEXT, -- optional decorative glyph
|
||||
description TEXT,
|
||||
position INTEGER NOT NULL DEFAULT 0, -- manual ordering
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_lists_user ON lists(user_id);
|
||||
|
||||
CREATE TYPE item_status AS ENUM ('coveted', 'acquired', 'renounced');
|
||||
|
||||
CREATE TABLE items (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
list_id UUID NOT NULL REFERENCES lists(id) ON DELETE CASCADE,
|
||||
title TEXT NOT NULL,
|
||||
url TEXT, -- pasted product URL (Phase 3 tracks this)
|
||||
note TEXT,
|
||||
status item_status NOT NULL DEFAULT 'coveted',
|
||||
target_price NUMERIC(12, 2), -- alert threshold the user sets
|
||||
position INTEGER NOT NULL DEFAULT 0,
|
||||
|
||||
-- Filled by the Phase 3 fetcher; NULL until first successful fetch.
|
||||
title_fetched TEXT,
|
||||
current_price NUMERIC(12, 2),
|
||||
currency TEXT, -- ISO 4217, e.g. 'EUR'
|
||||
image_url TEXT,
|
||||
in_stock BOOLEAN,
|
||||
source TEXT, -- adapter that produced the data, e.g. 'shopify'
|
||||
fetched_at TIMESTAMPTZ,
|
||||
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_items_list ON items(list_id);
|
||||
CREATE INDEX idx_items_url ON items(url) WHERE url IS NOT NULL;
|
||||
|
||||
CREATE TRIGGER trg_lists_updated
|
||||
BEFORE UPDATE ON lists
|
||||
FOR EACH ROW EXECUTE FUNCTION set_updated_at();
|
||||
|
||||
CREATE TRIGGER trg_items_updated
|
||||
BEFORE UPDATE ON items
|
||||
FOR EACH ROW EXECUTE FUNCTION set_updated_at();
|
||||
@@ -0,0 +1,25 @@
|
||||
-- Phase 3: price tracking. The refetch worker pulls product data for items that
|
||||
-- carry a URL (generic Shopify .json adapter first), updates the item's metadata
|
||||
-- columns, and appends a row to price_history on every successful fetch.
|
||||
|
||||
-- Per-item tracking control + last fetch outcome.
|
||||
ALTER TABLE items
|
||||
ADD COLUMN track_enabled BOOLEAN NOT NULL DEFAULT true,
|
||||
ADD COLUMN last_error TEXT,
|
||||
ADD COLUMN checked_at TIMESTAMPTZ; -- last fetch attempt (success or failure)
|
||||
|
||||
-- Append-only price observations. One row per successful fetch.
|
||||
CREATE TABLE price_history (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
item_id UUID NOT NULL REFERENCES items(id) ON DELETE CASCADE,
|
||||
price NUMERIC(12, 2) NOT NULL,
|
||||
currency TEXT NOT NULL,
|
||||
in_stock BOOLEAN,
|
||||
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_price_history_item ON price_history(item_id, fetched_at DESC);
|
||||
|
||||
-- Worker scan: trackable items that have a URL, cheapest checked first.
|
||||
CREATE INDEX idx_items_trackable ON items(checked_at NULLS FIRST)
|
||||
WHERE url IS NOT NULL AND track_enabled;
|
||||
@@ -0,0 +1,6 @@
|
||||
-- Phase 4: price-drop notifications.
|
||||
-- Tracks when we last emailed the owner that an item reached its target price.
|
||||
-- NULL = "armed": a future drop to/under target will notify. Stamped non-NULL
|
||||
-- after sending; cleared (re-armed) when the price rises back above target.
|
||||
ALTER TABLE items
|
||||
ADD COLUMN notified_at TIMESTAMPTZ;
|
||||
@@ -1,4 +1,6 @@
|
||||
use argon2::password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString};
|
||||
use argon2::password_hash::{
|
||||
rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString,
|
||||
};
|
||||
use argon2::Argon2;
|
||||
|
||||
use crate::error::{AppError, AppResult};
|
||||
|
||||
+20
-10
@@ -91,7 +91,11 @@ async fn register(
|
||||
validate(&req)?;
|
||||
|
||||
let hash = hash_password(&req.password)?;
|
||||
let display = req.display_name.as_deref().map(str::trim).filter(|s| !s.is_empty());
|
||||
let display = req
|
||||
.display_name
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|s| !s.is_empty());
|
||||
|
||||
let user = sqlx::query_as::<_, User>(
|
||||
"INSERT INTO users (email, password_hash, display_name)
|
||||
@@ -190,9 +194,13 @@ async fn request_password_reset(
|
||||
.fetch_optional(&state.pool)
|
||||
.await?
|
||||
{
|
||||
let token =
|
||||
tokens::create(&state.pool, user.id, TokenPurpose::PasswordReset, Duration::hours(1))
|
||||
.await?;
|
||||
let token = tokens::create(
|
||||
&state.pool,
|
||||
user.id,
|
||||
TokenPurpose::PasswordReset,
|
||||
Duration::hours(1),
|
||||
)
|
||||
.await?;
|
||||
let link = format!("{}/reset?token={}", state.config.public_app_url, token);
|
||||
let _ = state
|
||||
.mailer
|
||||
@@ -225,10 +233,7 @@ async fn reset_password(
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
async fn me(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
) -> AppResult<Json<MeResp>> {
|
||||
async fn me(State(state): State<AppState>, AuthUser(user): AuthUser) -> AppResult<Json<MeResp>> {
|
||||
let settings = sqlx::query_as::<_, UserSettings>(
|
||||
"SELECT user_id, locale, currency, theme, notify_email
|
||||
FROM user_settings WHERE user_id = $1",
|
||||
@@ -246,8 +251,13 @@ async fn me(
|
||||
// ── Helpers ─────────────────────────────────────────────────
|
||||
|
||||
async fn send_verification_email(state: &AppState, user: &User) -> AppResult<()> {
|
||||
let token =
|
||||
tokens::create(&state.pool, user.id, TokenPurpose::VerifyEmail, Duration::hours(24)).await?;
|
||||
let token = tokens::create(
|
||||
&state.pool,
|
||||
user.id,
|
||||
TokenPurpose::VerifyEmail,
|
||||
Duration::hours(24),
|
||||
)
|
||||
.await?;
|
||||
let link = format!("{}/verify?token={}", state.config.public_app_url, token);
|
||||
state
|
||||
.mailer
|
||||
|
||||
@@ -72,7 +72,5 @@ pub async fn consume(pool: &PgPool, raw: &str, purpose: TokenPurpose) -> AppResu
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
user_id.ok_or(AppError::BadRequest(
|
||||
"invalid or expired token".to_string(),
|
||||
))
|
||||
user_id.ok_or(AppError::BadRequest("invalid or expired token".to_string()))
|
||||
}
|
||||
|
||||
@@ -10,6 +10,12 @@ pub struct Config {
|
||||
pub public_app_url: String,
|
||||
pub cors_origins: Vec<String>,
|
||||
pub smtp: SmtpConfig,
|
||||
/// Background refetch worker tick. 0 disables the worker.
|
||||
pub refetch_interval_secs: u64,
|
||||
/// Min age before an item is eligible for the next automatic refetch.
|
||||
pub refetch_min_age_secs: i64,
|
||||
/// Default ISO 4217 currency when an adapter can't determine one.
|
||||
pub default_currency: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -58,6 +64,9 @@ impl Config {
|
||||
session_secret,
|
||||
public_app_url: opt("PUBLIC_APP_URL", "http://localhost:5173"),
|
||||
cors_origins,
|
||||
refetch_interval_secs: opt("REFETCH_INTERVAL_SECS", "300").parse()?,
|
||||
refetch_min_age_secs: opt("REFETCH_MIN_AGE_SECS", "21600").parse()?,
|
||||
default_currency: opt("DEFAULT_CURRENCY", "EUR").to_uppercase(),
|
||||
smtp: SmtpConfig {
|
||||
host: opt("SMTP_HOST", "localhost"),
|
||||
port: opt("SMTP_PORT", "587").parse()?,
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
//! Product data adapters. The deal source is a user-pasted product URL — no
|
||||
//! per-retailer scrapers. Adapters are generic platform readers; the first is
|
||||
//! Shopify, whose storefronts expose a public `/products/{handle}.json` document.
|
||||
|
||||
use rust_decimal::Decimal;
|
||||
|
||||
mod shopify;
|
||||
|
||||
/// Normalised product snapshot produced by an adapter.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FetchedProduct {
|
||||
pub title: String,
|
||||
pub price: Decimal,
|
||||
pub currency: String,
|
||||
pub image_url: Option<String>,
|
||||
pub in_stock: Option<bool>,
|
||||
pub source: &'static str,
|
||||
}
|
||||
|
||||
/// A shared HTTP client tuned for storefront fetches.
|
||||
pub fn http_client() -> reqwest::Client {
|
||||
reqwest::Client::builder()
|
||||
.user_agent("consumers-bot/0.1 (+self-hosted wantlist price watcher)")
|
||||
.timeout(std::time::Duration::from_secs(15))
|
||||
.build()
|
||||
.expect("failed to build reqwest client")
|
||||
}
|
||||
|
||||
/// Try every adapter in turn. Returns the first that recognises the URL.
|
||||
pub async fn fetch_product(
|
||||
client: &reqwest::Client,
|
||||
url: &str,
|
||||
default_currency: &str,
|
||||
) -> anyhow::Result<FetchedProduct> {
|
||||
if let Some(p) = shopify::fetch(client, url, default_currency).await? {
|
||||
return Ok(p);
|
||||
}
|
||||
anyhow::bail!("no adapter could read this URL (only Shopify storefronts are supported for now)")
|
||||
}
|
||||
@@ -0,0 +1,185 @@
|
||||
//! Generic Shopify storefront adapter.
|
||||
//!
|
||||
//! Every Shopify shop exposes an Ajax product document at
|
||||
//! `https://{shop}/products/{handle}.js`, which carries title, image, per-variant
|
||||
//! price (integer minor units) and availability. We derive that URL from the
|
||||
//! pasted product URL (any path with a `/products/{handle}` segment). No shop is
|
||||
//! hardcoded.
|
||||
//!
|
||||
//! Pricing is the subtle part. The `.js` doc reports the shop's *base* currency
|
||||
//! price. Shops using Shopify Markets show visitors a converted *presentment*
|
||||
//! price (e.g. a PLN shop shows EUR in the EU). That conversion is reachable
|
||||
//! generically via `.js?currency=EUR`. So we fetch both and:
|
||||
//! - if the converted price differs from the base price → Markets converted it,
|
||||
//! and the price is genuinely in our requested currency;
|
||||
//! - if they're equal → no conversion happened, so the price is in the shop's
|
||||
//! base currency (read from `/meta.json`), not our requested one.
|
||||
//! This stops us from mislabelling e.g. "821 EUR" when the value is 821 PLN.
|
||||
|
||||
use rust_decimal::Decimal;
|
||||
use serde::Deserialize;
|
||||
use url::{Position, Url};
|
||||
|
||||
use super::FetchedProduct;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct JsDoc {
|
||||
#[serde(default)]
|
||||
title: Option<String>,
|
||||
#[serde(default)]
|
||||
featured_image: Option<String>,
|
||||
#[serde(default)]
|
||||
variants: Vec<JsVariant>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct JsVariant {
|
||||
/// Price in the currency's minor units (cents), e.g. 19795 = 197.95.
|
||||
price: i64,
|
||||
#[serde(default)]
|
||||
available: Option<bool>,
|
||||
}
|
||||
|
||||
/// Returns `Ok(None)` when the URL isn't a Shopify product URL (so other
|
||||
/// adapters could try), `Err` when it looks like one but the fetch/parse fails.
|
||||
pub async fn fetch(
|
||||
client: &reqwest::Client,
|
||||
raw_url: &str,
|
||||
default_currency: &str,
|
||||
) -> anyhow::Result<Option<FetchedProduct>> {
|
||||
let Some(base_url) = product_doc_url(raw_url, "js") else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Presentment price in the requested currency (Markets-converted if enabled).
|
||||
let conv_url = format!("{base_url}?currency={default_currency}");
|
||||
let Some(conv) = fetch_js(client, &conv_url).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let Some(conv_cents) = cheapest(&conv.variants) else {
|
||||
anyhow::bail!("Shopify product has no readable price");
|
||||
};
|
||||
|
||||
// Base price (no currency param) to detect whether conversion happened.
|
||||
let base = fetch_js(client, &base_url).await?;
|
||||
let base_cents = base.as_ref().and_then(|b| cheapest(&b.variants));
|
||||
|
||||
let (cents, currency) = match base_cents {
|
||||
// Converted: value really is in the requested currency.
|
||||
Some(b) if b != conv_cents => (conv_cents, default_currency.to_string()),
|
||||
// No conversion: value is the shop's base currency.
|
||||
Some(b) => (
|
||||
b,
|
||||
shop_currency(client, raw_url)
|
||||
.await
|
||||
.unwrap_or_else(|| default_currency.to_string()),
|
||||
),
|
||||
None => (conv_cents, default_currency.to_string()),
|
||||
};
|
||||
|
||||
let in_stock = availability(&conv.variants);
|
||||
let title = conv
|
||||
.title
|
||||
.clone()
|
||||
.or_else(|| base.as_ref().and_then(|b| b.title.clone()))
|
||||
.unwrap_or_else(|| "Untitled product".to_string());
|
||||
let image_url = conv
|
||||
.featured_image
|
||||
.clone()
|
||||
.or_else(|| base.and_then(|b| b.featured_image))
|
||||
.map(normalize_image);
|
||||
|
||||
Ok(Some(FetchedProduct {
|
||||
title,
|
||||
price: Decimal::new(cents, 2),
|
||||
currency,
|
||||
image_url,
|
||||
in_stock,
|
||||
source: "shopify",
|
||||
}))
|
||||
}
|
||||
|
||||
/// GET a `.js` product doc. `Ok(None)` if it isn't a Shopify product document.
|
||||
async fn fetch_js(client: &reqwest::Client, url: &str) -> anyhow::Result<Option<JsDoc>> {
|
||||
let resp = client.get(url).send().await?;
|
||||
if !resp.status().is_success() {
|
||||
return Ok(None);
|
||||
}
|
||||
let body = resp.text().await?;
|
||||
Ok(serde_json::from_str::<JsDoc>(&body).ok())
|
||||
}
|
||||
|
||||
/// Cheapest available variant's price (minor units); falls back to cheapest
|
||||
/// overall. `None` if there are no priced variants.
|
||||
fn cheapest(variants: &[JsVariant]) -> Option<i64> {
|
||||
variants
|
||||
.iter()
|
||||
.filter(|v| v.available == Some(true))
|
||||
.map(|v| v.price)
|
||||
.min()
|
||||
.or_else(|| variants.iter().map(|v| v.price).min())
|
||||
}
|
||||
|
||||
/// `Some(true/false)` if any variant reported availability, else `None`.
|
||||
fn availability(variants: &[JsVariant]) -> Option<bool> {
|
||||
if variants.iter().all(|v| v.available.is_none()) {
|
||||
return None;
|
||||
}
|
||||
Some(variants.iter().any(|v| v.available == Some(true)))
|
||||
}
|
||||
|
||||
/// The shop's base ISO 4217 currency, from `{origin}/meta.json`. Best-effort.
|
||||
async fn shop_currency(client: &reqwest::Client, raw_url: &str) -> Option<String> {
|
||||
#[derive(Deserialize)]
|
||||
struct Meta {
|
||||
currency: String,
|
||||
}
|
||||
let origin = origin_of(raw_url)?;
|
||||
let resp = client
|
||||
.get(format!("{origin}/meta.json"))
|
||||
.send()
|
||||
.await
|
||||
.ok()?;
|
||||
if !resp.status().is_success() {
|
||||
return None;
|
||||
}
|
||||
let meta: Meta = resp.json().await.ok()?;
|
||||
let c = meta.currency.trim().to_uppercase();
|
||||
(c.len() == 3).then_some(c)
|
||||
}
|
||||
|
||||
/// Shopify image URLs are often protocol-relative (`//cdn.shopify.com/...`).
|
||||
fn normalize_image(src: String) -> String {
|
||||
if let Some(rest) = src.strip_prefix("//") {
|
||||
format!("https://{rest}")
|
||||
} else {
|
||||
src
|
||||
}
|
||||
}
|
||||
|
||||
fn origin_of(raw: &str) -> Option<String> {
|
||||
let u = Url::parse(raw).ok()?;
|
||||
if !matches!(u.scheme(), "http" | "https") {
|
||||
return None;
|
||||
}
|
||||
Some(u[..Position::BeforePath].to_string())
|
||||
}
|
||||
|
||||
/// Build `{origin}/products/{handle}.{ext}`, or `None` if there's no
|
||||
/// `/products/{handle}` segment.
|
||||
fn product_doc_url(raw: &str, ext: &str) -> Option<String> {
|
||||
let u = Url::parse(raw).ok()?;
|
||||
if !matches!(u.scheme(), "http" | "https") {
|
||||
return None;
|
||||
}
|
||||
let segs: Vec<&str> = u.path_segments()?.filter(|s| !s.is_empty()).collect();
|
||||
let pos = segs.iter().position(|s| *s == "products")?;
|
||||
let handle = segs.get(pos + 1)?;
|
||||
let handle = handle.trim_end_matches(".json").trim_end_matches(".js");
|
||||
if handle.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let origin = &u[..Position::BeforePath];
|
||||
Some(format!("{origin}/products/{handle}.{ext}"))
|
||||
}
|
||||
@@ -26,10 +26,8 @@ impl Mailer {
|
||||
.port(cfg.port);
|
||||
|
||||
if !cfg.username.is_empty() {
|
||||
builder = builder.credentials(Credentials::new(
|
||||
cfg.username.clone(),
|
||||
cfg.password.clone(),
|
||||
));
|
||||
builder =
|
||||
builder.credentials(Credentials::new(cfg.username.clone(), cfg.password.clone()));
|
||||
}
|
||||
|
||||
let from: Mailbox = cfg
|
||||
|
||||
@@ -2,10 +2,13 @@ mod auth;
|
||||
mod config;
|
||||
mod db;
|
||||
mod error;
|
||||
mod fetch;
|
||||
mod mail;
|
||||
mod models;
|
||||
mod notify;
|
||||
mod routes;
|
||||
mod state;
|
||||
mod worker;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -50,8 +53,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
pool,
|
||||
config: Arc::new(config.clone()),
|
||||
mailer,
|
||||
http: fetch::http_client(),
|
||||
};
|
||||
|
||||
worker::spawn(state.clone());
|
||||
|
||||
let api = Router::new()
|
||||
.merge(routes::router())
|
||||
.nest("/auth", auth::routes::router());
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use rust_decimal::Decimal;
|
||||
use serde::Serialize;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
@@ -43,3 +44,62 @@ pub struct UserSettings {
|
||||
pub theme: String,
|
||||
pub notify_email: bool,
|
||||
}
|
||||
|
||||
/// A topic-based wantlist ("altar"). Scoped to a user.
|
||||
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||
pub struct List {
|
||||
pub id: Uuid,
|
||||
#[serde(skip)]
|
||||
pub user_id: Uuid,
|
||||
pub name: String,
|
||||
pub emoji: Option<String>,
|
||||
pub description: Option<String>,
|
||||
pub position: i32,
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub created_at: OffsetDateTime,
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub updated_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
/// A coveted thing inside a list. Price/metadata columns are filled by the
|
||||
/// Phase 3 refetch worker and stay None until then.
|
||||
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||
pub struct Item {
|
||||
pub id: Uuid,
|
||||
pub list_id: Uuid,
|
||||
pub title: String,
|
||||
pub url: Option<String>,
|
||||
pub note: Option<String>,
|
||||
pub status: String,
|
||||
pub target_price: Option<Decimal>,
|
||||
pub position: i32,
|
||||
|
||||
pub title_fetched: Option<String>,
|
||||
pub current_price: Option<Decimal>,
|
||||
pub currency: Option<String>,
|
||||
pub image_url: Option<String>,
|
||||
pub in_stock: Option<bool>,
|
||||
pub source: Option<String>,
|
||||
#[serde(with = "time::serde::rfc3339::option")]
|
||||
pub fetched_at: Option<OffsetDateTime>,
|
||||
|
||||
pub track_enabled: bool,
|
||||
pub last_error: Option<String>,
|
||||
#[serde(with = "time::serde::rfc3339::option")]
|
||||
pub checked_at: Option<OffsetDateTime>,
|
||||
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub created_at: OffsetDateTime,
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub updated_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
/// One observed price point for an item. Append-only.
|
||||
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||
pub struct PricePoint {
|
||||
pub price: Decimal,
|
||||
pub currency: String,
|
||||
pub in_stock: Option<bool>,
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub fetched_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,143 @@
|
||||
//! Phase 4: price-drop notifications.
|
||||
//!
|
||||
//! After a successful refetch we check whether an item's watched price has
|
||||
//! fallen to or below the owner's target. If so — and we haven't already told
|
||||
//! them about this drop — we email them in the house gospel voice. The
|
||||
//! `items.notified_at` column is the de-dupe latch:
|
||||
//! - `NULL` = armed; a drop to/under target fires one email and stamps `now()`.
|
||||
//! - non-NULL = already announced; stays quiet until the price rises back
|
||||
//! above target, which clears the latch (re-arms) for the next drop.
|
||||
|
||||
use rust_decimal::Decimal;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
/// Row gathered for one item's notification decision.
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct NotifyRow {
|
||||
title: String,
|
||||
title_fetched: Option<String>,
|
||||
url: Option<String>,
|
||||
current_price: Option<Decimal>,
|
||||
target_price: Option<Decimal>,
|
||||
currency: Option<String>,
|
||||
in_stock: Option<bool>,
|
||||
notified_at: Option<time::OffsetDateTime>,
|
||||
email: String,
|
||||
display_name: Option<String>,
|
||||
notify_email: bool,
|
||||
}
|
||||
|
||||
/// Inspect one item after refetch and email its owner if the price just reached
|
||||
/// the target. Best-effort: never returns an error to the caller (a failed
|
||||
/// send must not fail the refetch); failures are logged.
|
||||
pub async fn maybe_notify_drop(state: &AppState, item_id: Uuid) {
|
||||
if let Err(e) = run(state, item_id).await {
|
||||
tracing::warn!(item = %item_id, error = %e, "price-drop notification failed");
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(state: &AppState, item_id: Uuid) -> anyhow::Result<()> {
|
||||
let row: Option<NotifyRow> = sqlx::query_as(
|
||||
"SELECT i.title, i.title_fetched, i.url, i.current_price, i.target_price,
|
||||
i.currency, i.in_stock, i.notified_at,
|
||||
u.email, u.display_name, s.notify_email
|
||||
FROM items i
|
||||
JOIN lists l ON l.id = i.list_id
|
||||
JOIN users u ON u.id = l.user_id
|
||||
JOIN user_settings s ON s.user_id = u.id
|
||||
WHERE i.id = $1",
|
||||
)
|
||||
.bind(item_id)
|
||||
.fetch_optional(&state.pool)
|
||||
.await?;
|
||||
|
||||
let Some(row) = row else { return Ok(()) };
|
||||
|
||||
// Need both a watched price and a target to judge a drop.
|
||||
let (Some(price), Some(target)) = (row.current_price, row.target_price) else {
|
||||
return Ok(());
|
||||
};
|
||||
let on_sale = price <= target;
|
||||
|
||||
match (on_sale, row.notified_at.is_some()) {
|
||||
// Reached target and not yet announced → email + latch.
|
||||
(true, false) => {
|
||||
if row.notify_email {
|
||||
send(state, &row, price, target).await?;
|
||||
}
|
||||
// Latch even if the user has email off, so flipping it on later
|
||||
// doesn't replay an old drop. Re-arms when price climbs back up.
|
||||
sqlx::query("UPDATE items SET notified_at = now() WHERE id = $1")
|
||||
.bind(item_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
}
|
||||
// Price rose back above target → clear the latch (re-arm).
|
||||
(false, true) => {
|
||||
sqlx::query("UPDATE items SET notified_at = NULL WHERE id = $1")
|
||||
.bind(item_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send(
|
||||
state: &AppState,
|
||||
row: &NotifyRow,
|
||||
price: Decimal,
|
||||
target: Decimal,
|
||||
) -> anyhow::Result<()> {
|
||||
let name = row.title_fetched.as_deref().unwrap_or(&row.title);
|
||||
let cur = row.currency.as_deref().unwrap_or("EUR");
|
||||
let now = format!("{cur} {price:.2}");
|
||||
let goal = format!("{cur} {target:.2}");
|
||||
let stock = match row.in_stock {
|
||||
Some(false) => " (sold out for now — but the sign is given)",
|
||||
_ => "",
|
||||
};
|
||||
let greeting = match row.display_name.as_deref() {
|
||||
Some(n) if !n.is_empty() => format!("{n}, "),
|
||||
_ => String::new(),
|
||||
};
|
||||
let link = row.url.as_deref();
|
||||
|
||||
let subject = format!("✦ The price has fallen — {name}");
|
||||
|
||||
let mut text = format!(
|
||||
"{greeting}your vigil is rewarded.\n\n\
|
||||
{name} now asks {now}, at or beneath your target of {goal}{stock}.\n\n\
|
||||
The moment is upon you. Consume, and ascend.\n"
|
||||
);
|
||||
if let Some(l) = link {
|
||||
text.push_str(&format!("\nApproach the shrine: {l}\n"));
|
||||
}
|
||||
text.push_str("\n— consume·rs\n");
|
||||
|
||||
let link_html = link
|
||||
.map(|l| {
|
||||
format!("<p><a href=\"{l}\" style=\"color:#7c6cf0\">Approach the shrine ↗</a></p>")
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let html = format!(
|
||||
"<div style=\"font-family:Georgia,serif;color:#1a1726\">\
|
||||
<p><em>{greeting}your vigil is rewarded.</em></p>\
|
||||
<p style=\"font-size:1.1em\"><strong>{name}</strong> now asks \
|
||||
<strong>{now}</strong>, at or beneath your target of {goal}{stock}.</p>\
|
||||
<p>The moment is upon you. Consume, and ascend.</p>\
|
||||
{link_html}\
|
||||
<p style=\"color:#8a849c\">— consume·rs</p>\
|
||||
</div>"
|
||||
);
|
||||
|
||||
state
|
||||
.mailer
|
||||
.send(&row.email, &subject, &text, &html)
|
||||
.await?;
|
||||
tracing::info!(to = %row.email, item = %name, "sent price-drop notification");
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,356 @@
|
||||
use axum::extract::{Path, State};
|
||||
use axum::routing::{get, post};
|
||||
use axum::{Json, Router};
|
||||
use rust_decimal::Decimal;
|
||||
use serde::Deserialize;
|
||||
use uuid::Uuid;
|
||||
use validator::Validate;
|
||||
|
||||
use crate::auth::session::AuthUser;
|
||||
use crate::error::{AppError, AppResult};
|
||||
use crate::models::{Item, List, PricePoint};
|
||||
use crate::state::AppState;
|
||||
use crate::worker;
|
||||
|
||||
pub fn router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/lists", get(list_lists).post(create_list))
|
||||
.route(
|
||||
"/lists/{id}",
|
||||
axum::routing::patch(update_list).delete(delete_list),
|
||||
)
|
||||
.route("/lists/{id}/items", get(list_items).post(create_item))
|
||||
.route(
|
||||
"/items/{id}",
|
||||
axum::routing::patch(update_item).delete(delete_item),
|
||||
)
|
||||
.route("/items/{id}/refetch", post(refetch_item))
|
||||
.route("/items/{id}/history", get(item_history))
|
||||
}
|
||||
|
||||
pub const ITEM_COLS: &str = "id, list_id, title, url, note, status::text AS status, target_price, \
|
||||
position, title_fetched, current_price, currency, image_url, in_stock, source, fetched_at, \
|
||||
track_enabled, last_error, checked_at, created_at, updated_at";
|
||||
|
||||
// Same columns, qualified with the `i` alias for use in UPDATE … FROM lists,
|
||||
// where bare `id`/`position`/`created_at` would be ambiguous across both tables.
|
||||
const ITEM_COLS_I: &str = "i.id, i.list_id, i.title, i.url, i.note, i.status::text AS status, \
|
||||
i.target_price, i.position, i.title_fetched, i.current_price, i.currency, i.image_url, \
|
||||
i.in_stock, i.source, i.fetched_at, i.track_enabled, i.last_error, i.checked_at, \
|
||||
i.created_at, i.updated_at";
|
||||
|
||||
const ALLOWED_STATUS: &[&str] = &["coveted", "acquired", "renounced"];
|
||||
|
||||
// ---- Lists ----------------------------------------------------------------
|
||||
|
||||
async fn list_lists(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
) -> AppResult<Json<Vec<List>>> {
|
||||
let lists = sqlx::query_as::<_, List>(
|
||||
"SELECT id, user_id, name, emoji, description, position, created_at, updated_at
|
||||
FROM lists WHERE user_id = $1 ORDER BY position, created_at",
|
||||
)
|
||||
.bind(user.id)
|
||||
.fetch_all(&state.pool)
|
||||
.await?;
|
||||
Ok(Json(lists))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Validate)]
|
||||
struct CreateListReq {
|
||||
#[validate(length(min = 1, max = 80, message = "name must be 1–80 chars"))]
|
||||
name: String,
|
||||
#[validate(length(max = 16))]
|
||||
emoji: Option<String>,
|
||||
#[validate(length(max = 500))]
|
||||
description: Option<String>,
|
||||
}
|
||||
|
||||
async fn create_list(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Json(req): Json<CreateListReq>,
|
||||
) -> AppResult<Json<List>> {
|
||||
req.validate()
|
||||
.map_err(|e| AppError::Validation(e.to_string()))?;
|
||||
|
||||
let list = sqlx::query_as::<_, List>(
|
||||
"INSERT INTO lists (user_id, name, emoji, description, position)
|
||||
VALUES ($1, $2, $3, $4,
|
||||
COALESCE((SELECT MAX(position) + 1 FROM lists WHERE user_id = $1), 0))
|
||||
RETURNING id, user_id, name, emoji, description, position, created_at, updated_at",
|
||||
)
|
||||
.bind(user.id)
|
||||
.bind(req.name.trim())
|
||||
.bind(opt_trim(req.emoji))
|
||||
.bind(opt_trim(req.description))
|
||||
.fetch_one(&state.pool)
|
||||
.await?;
|
||||
Ok(Json(list))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Validate)]
|
||||
struct UpdateListReq {
|
||||
#[validate(length(min = 1, max = 80, message = "name must be 1–80 chars"))]
|
||||
name: Option<String>,
|
||||
#[validate(length(max = 16))]
|
||||
emoji: Option<String>,
|
||||
#[validate(length(max = 500))]
|
||||
description: Option<String>,
|
||||
position: Option<i32>,
|
||||
}
|
||||
|
||||
async fn update_list(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Path(id): Path<Uuid>,
|
||||
Json(req): Json<UpdateListReq>,
|
||||
) -> AppResult<Json<List>> {
|
||||
req.validate()
|
||||
.map_err(|e| AppError::Validation(e.to_string()))?;
|
||||
|
||||
let list = sqlx::query_as::<_, List>(
|
||||
"UPDATE lists SET
|
||||
name = COALESCE($3, name),
|
||||
emoji = COALESCE($4, emoji),
|
||||
description = COALESCE($5, description),
|
||||
position = COALESCE($6, position)
|
||||
WHERE id = $1 AND user_id = $2
|
||||
RETURNING id, user_id, name, emoji, description, position, created_at, updated_at",
|
||||
)
|
||||
.bind(id)
|
||||
.bind(user.id)
|
||||
.bind(req.name.map(|s| s.trim().to_string()))
|
||||
.bind(opt_trim(req.emoji))
|
||||
.bind(opt_trim(req.description))
|
||||
.bind(req.position)
|
||||
.fetch_optional(&state.pool)
|
||||
.await?
|
||||
.ok_or(AppError::NotFound)?;
|
||||
Ok(Json(list))
|
||||
}
|
||||
|
||||
async fn delete_list(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> AppResult<Json<serde_json::Value>> {
|
||||
let res = sqlx::query("DELETE FROM lists WHERE id = $1 AND user_id = $2")
|
||||
.bind(id)
|
||||
.bind(user.id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
if res.rows_affected() == 0 {
|
||||
return Err(AppError::NotFound);
|
||||
}
|
||||
Ok(Json(serde_json::json!({ "deleted": id })))
|
||||
}
|
||||
|
||||
// ---- Items ----------------------------------------------------------------
|
||||
|
||||
/// Confirm the list exists and belongs to the user. Returns NotFound otherwise.
|
||||
async fn assert_list_owner(state: &AppState, list_id: Uuid, user_id: Uuid) -> AppResult<()> {
|
||||
let owns = sqlx::query_scalar::<_, bool>(
|
||||
"SELECT EXISTS(SELECT 1 FROM lists WHERE id = $1 AND user_id = $2)",
|
||||
)
|
||||
.bind(list_id)
|
||||
.bind(user_id)
|
||||
.fetch_one(&state.pool)
|
||||
.await?;
|
||||
if owns {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(AppError::NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_items(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Path(list_id): Path<Uuid>,
|
||||
) -> AppResult<Json<Vec<Item>>> {
|
||||
assert_list_owner(&state, list_id, user.id).await?;
|
||||
let items = sqlx::query_as::<_, Item>(&format!(
|
||||
"SELECT {ITEM_COLS} FROM items WHERE list_id = $1 ORDER BY position, created_at"
|
||||
))
|
||||
.bind(list_id)
|
||||
.fetch_all(&state.pool)
|
||||
.await?;
|
||||
Ok(Json(items))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Validate)]
|
||||
struct CreateItemReq {
|
||||
#[validate(length(min = 1, max = 200, message = "title must be 1–200 chars"))]
|
||||
title: String,
|
||||
#[validate(url(message = "url must be a valid URL"))]
|
||||
url: Option<String>,
|
||||
#[validate(length(max = 1000))]
|
||||
note: Option<String>,
|
||||
target_price: Option<Decimal>,
|
||||
}
|
||||
|
||||
async fn create_item(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Path(list_id): Path<Uuid>,
|
||||
Json(req): Json<CreateItemReq>,
|
||||
) -> AppResult<Json<Item>> {
|
||||
req.validate()
|
||||
.map_err(|e| AppError::Validation(e.to_string()))?;
|
||||
assert_list_owner(&state, list_id, user.id).await?;
|
||||
|
||||
let item = sqlx::query_as::<_, Item>(&format!(
|
||||
"INSERT INTO items (list_id, title, url, note, target_price, position)
|
||||
VALUES ($1, $2, $3, $4, $5,
|
||||
COALESCE((SELECT MAX(position) + 1 FROM items WHERE list_id = $1), 0))
|
||||
RETURNING {ITEM_COLS}"
|
||||
))
|
||||
.bind(list_id)
|
||||
.bind(req.title.trim())
|
||||
.bind(opt_trim(req.url))
|
||||
.bind(opt_trim(req.note))
|
||||
.bind(req.target_price)
|
||||
.fetch_one(&state.pool)
|
||||
.await?;
|
||||
Ok(Json(item))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Validate)]
|
||||
struct UpdateItemReq {
|
||||
#[validate(length(min = 1, max = 200, message = "title must be 1–200 chars"))]
|
||||
title: Option<String>,
|
||||
#[validate(url(message = "url must be a valid URL"))]
|
||||
url: Option<String>,
|
||||
#[validate(length(max = 1000))]
|
||||
note: Option<String>,
|
||||
status: Option<String>,
|
||||
target_price: Option<Decimal>,
|
||||
position: Option<i32>,
|
||||
}
|
||||
|
||||
async fn update_item(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Path(id): Path<Uuid>,
|
||||
Json(req): Json<UpdateItemReq>,
|
||||
) -> AppResult<Json<Item>> {
|
||||
req.validate()
|
||||
.map_err(|e| AppError::Validation(e.to_string()))?;
|
||||
if let Some(s) = &req.status {
|
||||
if !ALLOWED_STATUS.contains(&s.as_str()) {
|
||||
return Err(AppError::Validation(format!("unknown status: {s}")));
|
||||
}
|
||||
}
|
||||
|
||||
// Ownership enforced via the join to lists.user_id.
|
||||
let item = sqlx::query_as::<_, Item>(&format!(
|
||||
"UPDATE items i SET
|
||||
title = COALESCE($3, i.title),
|
||||
url = COALESCE($4, i.url),
|
||||
note = COALESCE($5, i.note),
|
||||
status = COALESCE($6::item_status, i.status),
|
||||
target_price = COALESCE($7, i.target_price),
|
||||
position = COALESCE($8, i.position)
|
||||
FROM lists l
|
||||
WHERE i.id = $1 AND i.list_id = l.id AND l.user_id = $2
|
||||
RETURNING {ITEM_COLS_I}"
|
||||
))
|
||||
.bind(id)
|
||||
.bind(user.id)
|
||||
.bind(req.title.map(|s| s.trim().to_string()))
|
||||
.bind(opt_trim(req.url))
|
||||
.bind(opt_trim(req.note))
|
||||
.bind(req.status)
|
||||
.bind(req.target_price)
|
||||
.bind(req.position)
|
||||
.fetch_optional(&state.pool)
|
||||
.await?
|
||||
.ok_or(AppError::NotFound)?;
|
||||
Ok(Json(item))
|
||||
}
|
||||
|
||||
async fn delete_item(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> AppResult<Json<serde_json::Value>> {
|
||||
let res = sqlx::query(
|
||||
"DELETE FROM items i USING lists l
|
||||
WHERE i.id = $1 AND i.list_id = l.id AND l.user_id = $2",
|
||||
)
|
||||
.bind(id)
|
||||
.bind(user.id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
if res.rows_affected() == 0 {
|
||||
return Err(AppError::NotFound);
|
||||
}
|
||||
Ok(Json(serde_json::json!({ "deleted": id })))
|
||||
}
|
||||
|
||||
// ---- Tracking -------------------------------------------------------------
|
||||
|
||||
/// Owned item's URL, or NotFound. Inner Option is the (nullable) url.
|
||||
async fn owned_item_url(
|
||||
state: &AppState,
|
||||
item_id: Uuid,
|
||||
user_id: Uuid,
|
||||
) -> AppResult<Option<String>> {
|
||||
let row = sqlx::query_as::<_, (Option<String>,)>(
|
||||
"SELECT i.url FROM items i JOIN lists l ON l.id = i.list_id
|
||||
WHERE i.id = $1 AND l.user_id = $2",
|
||||
)
|
||||
.bind(item_id)
|
||||
.bind(user_id)
|
||||
.fetch_optional(&state.pool)
|
||||
.await?
|
||||
.ok_or(AppError::NotFound)?;
|
||||
Ok(row.0)
|
||||
}
|
||||
|
||||
/// Refetch a single item's price on demand. Surfaces fetch errors to the user.
|
||||
async fn refetch_item(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> AppResult<Json<Item>> {
|
||||
let url = owned_item_url(&state, id, user.id).await?.ok_or_else(|| {
|
||||
AppError::BadRequest("this temptation has no URL to keep vigil over".into())
|
||||
})?;
|
||||
|
||||
worker::refetch(&state, id, &url)
|
||||
.await
|
||||
.map_err(|e| AppError::BadRequest(e.to_string()))?;
|
||||
|
||||
let item = sqlx::query_as::<_, Item>(&format!("SELECT {ITEM_COLS} FROM items WHERE id = $1"))
|
||||
.bind(id)
|
||||
.fetch_one(&state.pool)
|
||||
.await?;
|
||||
Ok(Json(item))
|
||||
}
|
||||
|
||||
/// Price observations for an item, newest first.
|
||||
async fn item_history(
|
||||
State(state): State<AppState>,
|
||||
AuthUser(user): AuthUser,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> AppResult<Json<Vec<PricePoint>>> {
|
||||
// Ownership: NotFound if the item isn't the user's.
|
||||
owned_item_url(&state, id, user.id).await?;
|
||||
|
||||
let history = sqlx::query_as::<_, PricePoint>(
|
||||
"SELECT price, currency, in_stock, fetched_at
|
||||
FROM price_history WHERE item_id = $1
|
||||
ORDER BY fetched_at DESC LIMIT 200",
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_all(&state.pool)
|
||||
.await?;
|
||||
Ok(Json(history))
|
||||
}
|
||||
|
||||
fn opt_trim(s: Option<String>) -> Option<String> {
|
||||
s.map(|s| s.trim().to_string()).filter(|s| !s.is_empty())
|
||||
}
|
||||
@@ -10,11 +10,14 @@ use crate::error::{AppError, AppResult};
|
||||
use crate::models::UserSettings;
|
||||
use crate::state::AppState;
|
||||
|
||||
mod lists;
|
||||
|
||||
pub fn router() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/health", get(health))
|
||||
.route("/settings", patch(update_settings))
|
||||
.route("/profile", patch(update_profile))
|
||||
.merge(lists::router())
|
||||
}
|
||||
|
||||
async fn health() -> Json<Value> {
|
||||
@@ -50,7 +53,9 @@ async fn update_settings(
|
||||
}
|
||||
if let Some(cur) = &req.currency {
|
||||
if cur.len() != 3 {
|
||||
return Err(AppError::Validation("currency must be a 3-letter code".into()));
|
||||
return Err(AppError::Validation(
|
||||
"currency must be a 3-letter code".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,7 +93,11 @@ async fn update_profile(
|
||||
req.validate()
|
||||
.map_err(|e| AppError::Validation(e.to_string()))?;
|
||||
|
||||
let display = req.display_name.as_deref().map(str::trim).filter(|s| !s.is_empty());
|
||||
let display = req
|
||||
.display_name
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|s| !s.is_empty());
|
||||
sqlx::query("UPDATE users SET display_name = $2 WHERE id = $1")
|
||||
.bind(user.id)
|
||||
.bind(display)
|
||||
|
||||
@@ -11,4 +11,6 @@ pub struct AppState {
|
||||
pub pool: PgPool,
|
||||
pub config: Arc<Config>,
|
||||
pub mailer: Mailer,
|
||||
/// Shared outbound HTTP client for product fetches.
|
||||
pub http: reqwest::Client,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
//! Background price-refetch worker. Periodically pulls product data for
|
||||
//! trackable items via the generic adapters in [`crate::fetch`], updates each
|
||||
//! item's metadata columns, and appends a `price_history` row on success.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::fetch::{self, FetchedProduct};
|
||||
use crate::notify;
|
||||
use crate::state::AppState;
|
||||
|
||||
const BATCH: i64 = 20;
|
||||
|
||||
/// Spawn the periodic worker. A zero interval disables it.
|
||||
pub fn spawn(state: AppState) {
|
||||
let interval = state.config.refetch_interval_secs;
|
||||
if interval == 0 {
|
||||
tracing::info!("refetch worker disabled (REFETCH_INTERVAL_SECS=0)");
|
||||
return;
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(interval));
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
tracing::info!(interval_secs = interval, "refetch worker started");
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
if let Err(e) = run_once(&state).await {
|
||||
tracing::error!(error = ?e, "refetch tick failed");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// One pass: refetch a batch of due items.
|
||||
async fn run_once(state: &AppState) -> anyhow::Result<()> {
|
||||
let due: Vec<(Uuid, String)> = sqlx::query_as(
|
||||
"SELECT id, url FROM items
|
||||
WHERE url IS NOT NULL AND track_enabled
|
||||
AND (checked_at IS NULL OR checked_at < now() - ($1 * interval '1 second'))
|
||||
ORDER BY checked_at NULLS FIRST
|
||||
LIMIT $2",
|
||||
)
|
||||
.bind(state.config.refetch_min_age_secs)
|
||||
.bind(BATCH)
|
||||
.fetch_all(&state.pool)
|
||||
.await?;
|
||||
|
||||
if due.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
tracing::debug!(count = due.len(), "refetching due items");
|
||||
|
||||
for (id, url) in due {
|
||||
if let Err(e) = refetch(state, id, &url).await {
|
||||
tracing::warn!(item = %id, error = %e, "item refetch failed");
|
||||
}
|
||||
// Be a polite guest on storefronts.
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch one item and persist the outcome. Records `last_error` + `checked_at`
|
||||
/// on failure (and still returns `Err` so callers can surface it). On success,
|
||||
/// fires a price-drop notification if the item just reached its target price.
|
||||
pub async fn refetch(state: &AppState, item_id: Uuid, url: &str) -> anyhow::Result<()> {
|
||||
match fetch::fetch_product(&state.http, url, &state.config.default_currency).await {
|
||||
Ok(p) => {
|
||||
apply_success(&state.pool, item_id, &p).await?;
|
||||
notify::maybe_notify_drop(state, item_id).await;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
let msg = e.to_string();
|
||||
apply_failure(&state.pool, item_id, &msg).await?;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn apply_success(pool: &PgPool, item_id: Uuid, p: &FetchedProduct) -> anyhow::Result<()> {
|
||||
let mut tx = pool.begin().await?;
|
||||
sqlx::query(
|
||||
"UPDATE items SET
|
||||
title_fetched = $2,
|
||||
current_price = $3,
|
||||
currency = $4,
|
||||
image_url = COALESCE($5, image_url),
|
||||
in_stock = $6,
|
||||
source = $7,
|
||||
fetched_at = now(),
|
||||
checked_at = now(),
|
||||
last_error = NULL
|
||||
WHERE id = $1",
|
||||
)
|
||||
.bind(item_id)
|
||||
.bind(&p.title)
|
||||
.bind(p.price)
|
||||
.bind(&p.currency)
|
||||
.bind(p.image_url.as_deref())
|
||||
.bind(p.in_stock)
|
||||
.bind(p.source)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO price_history (item_id, price, currency, in_stock)
|
||||
VALUES ($1, $2, $3, $4)",
|
||||
)
|
||||
.bind(item_id)
|
||||
.bind(p.price)
|
||||
.bind(&p.currency)
|
||||
.bind(p.in_stock)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn apply_failure(pool: &PgPool, item_id: Uuid, msg: &str) -> anyhow::Result<()> {
|
||||
sqlx::query("UPDATE items SET checked_at = now(), last_error = $2 WHERE id = $1")
|
||||
.bind(item_id)
|
||||
.bind(msg)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user