added sharing and subscriptions

This commit is contained in:
2026-06-17 23:27:37 +02:00
parent 148e441425
commit 8a614cb1d1
16 changed files with 1019 additions and 26 deletions
+6
View File
@@ -0,0 +1,6 @@
-- Public read-only sharing for lists.
-- NULL share_token = private (default). A non-NULL token is an unguessable
-- secret: anyone holding it can view the list read-only at /api/shared/{token},
-- no account required. Revoking (unshare) sets it back to NULL.
ALTER TABLE lists
ADD COLUMN share_token TEXT UNIQUE;
+31
View File
@@ -0,0 +1,31 @@
-- Subscriptions: a logged-in user follows someone else's shared list or a
-- single shared item, to receive the same price-drop emails the owner gets.
-- Exactly one of list_id / item_id is set per row. A list subscription
-- implicitly covers every item on that list — present and future — expanded
-- at notify time rather than materialised per item.
CREATE TABLE subscriptions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
list_id UUID REFERENCES lists(id) ON DELETE CASCADE,
item_id UUID REFERENCES items(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT subscription_target_exactly_one
CHECK ((list_id IS NOT NULL) <> (item_id IS NOT NULL))
);
-- One subscription per (user, list) and per (user, item).
CREATE UNIQUE INDEX subscriptions_user_list
ON subscriptions (user_id, list_id) WHERE list_id IS NOT NULL;
CREATE UNIQUE INDEX subscriptions_user_item
ON subscriptions (user_id, item_id) WHERE item_id IS NOT NULL;
-- Per-subscriber, per-item de-dupe latch for target-price alerts — the
-- subscriber-side mirror of items.notified_at (which latches the owner).
-- Present = already announced this drop; absent = armed. Cleared when the
-- price climbs back above the item's target.
CREATE TABLE subscription_notify (
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
item_id UUID NOT NULL REFERENCES items(id) ON DELETE CASCADE,
notified_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (user_id, item_id)
);
+2
View File
@@ -54,6 +54,8 @@ pub struct List {
pub name: String,
pub emoji: Option<String>,
pub description: Option<String>,
/// Unguessable secret for public read-only sharing; None = private.
pub share_token: Option<String>,
pub position: i32,
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
+136
View File
@@ -129,6 +129,142 @@ async fn any_drop(
Ok(())
}
// ---- Subscriber fan-out ---------------------------------------------------
/// Item fields needed to judge a drop, plus its list + owner for subscriber lookup.
#[derive(sqlx::FromRow)]
struct ItemFields {
title: String,
title_fetched: Option<String>,
url: Option<String>,
current_price: Option<Decimal>,
target_price: Option<Decimal>,
currency: Option<String>,
in_stock: Option<bool>,
list_id: Uuid,
owner_id: Uuid,
}
#[derive(sqlx::FromRow)]
struct Subscriber {
user_id: Uuid,
email: String,
display_name: Option<String>,
notify_email: bool,
}
/// Fan the same price-drop signal out to everyone subscribed to this item or to
/// its parent list (the owner is notified separately via [`maybe_notify_drop`]).
/// Best-effort: never errors the refetch.
pub async fn maybe_notify_subscribers(state: &AppState, item_id: Uuid) {
if let Err(e) = run_subs(state, item_id).await {
tracing::warn!(item = %item_id, error = %e, "subscriber notification failed");
}
}
async fn run_subs(state: &AppState, item_id: Uuid) -> anyhow::Result<()> {
let item: Option<ItemFields> = sqlx::query_as(
"SELECT i.title, i.title_fetched, i.url, i.current_price, i.target_price,
i.currency, i.in_stock, i.list_id, l.user_id AS owner_id
FROM items i JOIN lists l ON l.id = i.list_id
WHERE i.id = $1",
)
.bind(item_id)
.fetch_optional(&state.pool)
.await?;
let Some(item) = item else { return Ok(()) };
let Some(price) = item.current_price else {
return Ok(());
};
// Distinct subscribers reached via the item directly OR its parent list;
// never the owner (they get the owner-path email).
let subs: Vec<Subscriber> = sqlx::query_as(
"SELECT DISTINCT u.id AS user_id, u.email, u.display_name, st.notify_email
FROM users u
JOIN user_settings st ON st.user_id = u.id
JOIN subscriptions sub ON sub.user_id = u.id
WHERE (sub.item_id = $1 OR sub.list_id = $2) AND u.id <> $3",
)
.bind(item_id)
.bind(item.list_id)
.bind(item.owner_id)
.fetch_all(&state.pool)
.await?;
for s in subs {
let row = NotifyRow {
title: item.title.clone(),
title_fetched: item.title_fetched.clone(),
url: item.url.clone(),
current_price: item.current_price,
target_price: item.target_price,
currency: item.currency.clone(),
in_stock: item.in_stock,
notified_at: None, // subscriber latch lives in subscription_notify
email: s.email,
display_name: s.display_name,
notify_email: s.notify_email,
};
let res = match item.target_price {
Some(target) => target_drop_sub(state, s.user_id, item_id, &row, price, target).await,
// No target → reuse the owner's any-drop logic (latch-free; reads
// only shared price_history). Each subscriber gets their own email.
None => any_drop(state, item_id, &row, price).await,
};
if let Err(e) = res {
tracing::warn!(item = %item_id, user = %s.user_id, error = %e, "subscriber notify failed");
}
}
Ok(())
}
/// Subscriber target mode: same semantics as [`target_drop`], but the de-dupe
/// latch lives per-subscriber in `subscription_notify(user_id, item_id)`.
async fn target_drop_sub(
state: &AppState,
user_id: Uuid,
item_id: Uuid,
row: &NotifyRow,
price: Decimal,
target: Decimal,
) -> anyhow::Result<()> {
let latched: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM subscription_notify WHERE user_id = $1 AND item_id = $2)",
)
.bind(user_id)
.bind(item_id)
.fetch_one(&state.pool)
.await?;
let on_sale = price <= target;
match (on_sale, latched) {
(true, false) => {
if row.notify_email {
send(state, row, price, Some(target)).await?;
}
sqlx::query(
"INSERT INTO subscription_notify (user_id, item_id) VALUES ($1, $2)
ON CONFLICT (user_id, item_id) DO UPDATE SET notified_at = now()",
)
.bind(user_id)
.bind(item_id)
.execute(&state.pool)
.await?;
}
(false, true) => {
sqlx::query("DELETE FROM subscription_notify WHERE user_id = $1 AND item_id = $2")
.bind(user_id)
.bind(item_id)
.execute(&state.pool)
.await?;
}
_ => {}
}
Ok(())
}
async fn send(
state: &AppState,
row: &NotifyRow,
+82 -10
View File
@@ -19,6 +19,8 @@ pub fn router() -> Router<AppState> {
"/lists/{id}",
axum::routing::patch(update_list).delete(delete_list),
)
.route("/lists/{id}/share", post(share_list).delete(unshare_list))
.route("/shared/{token}", get(shared_view))
.route("/lists/{id}/items", get(list_items).post(create_item))
.route(
"/items/{id}",
@@ -39,6 +41,9 @@ const ITEM_COLS_I: &str = "i.id, i.list_id, i.title, i.url, i.note, i.status::te
i.in_stock, i.source, i.fetched_at, i.track_enabled, i.last_error, i.checked_at, \
i.created_at, i.updated_at";
const LIST_COLS: &str =
"id, user_id, name, emoji, description, share_token, position, created_at, updated_at";
const ALLOWED_STATUS: &[&str] = &["coveted", "acquired", "renounced"];
// ---- Lists ----------------------------------------------------------------
@@ -47,10 +52,10 @@ async fn list_lists(
State(state): State<AppState>,
AuthUser(user): AuthUser,
) -> AppResult<Json<Vec<List>>> {
let lists = sqlx::query_as::<_, List>(
"SELECT id, user_id, name, emoji, description, position, created_at, updated_at
FROM lists WHERE user_id = $1 ORDER BY position, created_at",
)
let lists = sqlx::query_as::<_, List>(&format!(
"SELECT {LIST_COLS}
FROM lists WHERE user_id = $1 ORDER BY position, created_at"
))
.bind(user.id)
.fetch_all(&state.pool)
.await?;
@@ -75,12 +80,12 @@ async fn create_list(
req.validate()
.map_err(|e| AppError::Validation(e.to_string()))?;
let list = sqlx::query_as::<_, List>(
let list = sqlx::query_as::<_, List>(&format!(
"INSERT INTO lists (user_id, name, emoji, description, position)
VALUES ($1, $2, $3, $4,
COALESCE((SELECT MAX(position) + 1 FROM lists WHERE user_id = $1), 0))
RETURNING id, user_id, name, emoji, description, position, created_at, updated_at",
)
RETURNING {LIST_COLS}"
))
.bind(user.id)
.bind(req.name.trim())
.bind(opt_trim(req.emoji))
@@ -110,15 +115,15 @@ async fn update_list(
req.validate()
.map_err(|e| AppError::Validation(e.to_string()))?;
let list = sqlx::query_as::<_, List>(
let list = sqlx::query_as::<_, List>(&format!(
"UPDATE lists SET
name = COALESCE($3, name),
emoji = COALESCE($4, emoji),
description = COALESCE($5, description),
position = COALESCE($6, position)
WHERE id = $1 AND user_id = $2
RETURNING id, user_id, name, emoji, description, position, created_at, updated_at",
)
RETURNING {LIST_COLS}"
))
.bind(id)
.bind(user.id)
.bind(req.name.map(|s| s.trim().to_string()))
@@ -147,6 +152,73 @@ async fn delete_list(
Ok(Json(serde_json::json!({ "deleted": id })))
}
// ---- Sharing --------------------------------------------------------------
/// Turn on public sharing: mint a token if the list doesn't have one yet
/// (idempotent — repeat calls keep the same link). Returns the updated list.
async fn share_list(
State(state): State<AppState>,
AuthUser(user): AuthUser,
Path(id): Path<Uuid>,
) -> AppResult<Json<List>> {
let token = Uuid::new_v4().simple().to_string();
let list = sqlx::query_as::<_, List>(&format!(
"UPDATE lists SET share_token = COALESCE(share_token, $3)
WHERE id = $1 AND user_id = $2
RETURNING {LIST_COLS}"
))
.bind(id)
.bind(user.id)
.bind(token)
.fetch_optional(&state.pool)
.await?
.ok_or(AppError::NotFound)?;
Ok(Json(list))
}
/// Revoke sharing: any existing link stops working immediately.
async fn unshare_list(
State(state): State<AppState>,
AuthUser(user): AuthUser,
Path(id): Path<Uuid>,
) -> AppResult<Json<List>> {
let list = sqlx::query_as::<_, List>(&format!(
"UPDATE lists SET share_token = NULL
WHERE id = $1 AND user_id = $2
RETURNING {LIST_COLS}"
))
.bind(id)
.bind(user.id)
.fetch_optional(&state.pool)
.await?
.ok_or(AppError::NotFound)?;
Ok(Json(list))
}
/// Public, unauthenticated read-only view of a shared list + its items.
/// No `AuthUser` extractor: holding the secret token is the only credential.
async fn shared_view(
State(state): State<AppState>,
Path(token): Path<String>,
) -> AppResult<Json<serde_json::Value>> {
let list = sqlx::query_as::<_, List>(&format!(
"SELECT {LIST_COLS} FROM lists WHERE share_token = $1"
))
.bind(&token)
.fetch_optional(&state.pool)
.await?
.ok_or(AppError::NotFound)?;
let items = sqlx::query_as::<_, Item>(&format!(
"SELECT {ITEM_COLS} FROM items WHERE list_id = $1 ORDER BY position, created_at"
))
.bind(list.id)
.fetch_all(&state.pool)
.await?;
Ok(Json(serde_json::json!({ "list": list, "items": items })))
}
// ---- Items ----------------------------------------------------------------
/// Confirm the list exists and belongs to the user. Returns NotFound otherwise.
+2
View File
@@ -11,6 +11,7 @@ use crate::models::UserSettings;
use crate::state::AppState;
mod lists;
mod subs;
pub fn router() -> Router<AppState> {
Router::new()
@@ -18,6 +19,7 @@ pub fn router() -> Router<AppState> {
.route("/settings", patch(update_settings))
.route("/profile", patch(update_profile))
.merge(lists::router())
.merge(subs::router())
}
async fn health() -> Json<Value> {
+181
View File
@@ -0,0 +1,181 @@
//! Subscriptions: a logged-in user follows another user's shared list or item
//! to receive the same price-drop emails. Subscribing requires the target to be
//! currently shared (`lists.share_token IS NOT NULL`) and not the user's own.
use axum::extract::{Path, State};
use axum::routing::get;
use axum::{Json, Router};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::auth::session::AuthUser;
use crate::error::{AppError, AppResult};
use crate::state::AppState;
pub fn router() -> Router<AppState> {
Router::new()
.route("/subscriptions", get(list_subscriptions).post(subscribe))
.route("/subscriptions/{id}", axum::routing::delete(unsubscribe))
}
/// One subscription, enriched with the followed list/item's display fields so
/// the /subscriptions page can render without extra round-trips.
#[derive(Debug, Serialize, sqlx::FromRow)]
struct SubscriptionView {
id: Uuid,
kind: String, // "list" | "item"
#[serde(with = "time::serde::rfc3339")]
created_at: OffsetDateTime,
list_id: Option<Uuid>,
item_id: Option<Uuid>,
title: String,
emoji: Option<String>,
share_token: Option<String>,
url: Option<String>,
image_url: Option<String>,
current_price: Option<Decimal>,
currency: Option<String>,
in_stock: Option<bool>,
target_price: Option<Decimal>,
}
async fn list_subscriptions(
State(state): State<AppState>,
AuthUser(user): AuthUser,
) -> AppResult<Json<Vec<SubscriptionView>>> {
// List subs + item subs, unioned and enriched. Explicit casts keep the two
// SELECT arms type-compatible across the UNION.
let subs = sqlx::query_as::<_, SubscriptionView>(
"SELECT sub.id, 'list' AS kind, sub.created_at,
l.id AS list_id, NULL::uuid AS item_id,
l.name AS title, l.emoji, l.share_token,
NULL::text AS url, NULL::text AS image_url,
NULL::numeric AS current_price, NULL::text AS currency,
NULL::boolean AS in_stock, NULL::numeric AS target_price
FROM subscriptions sub
JOIN lists l ON l.id = sub.list_id
WHERE sub.user_id = $1 AND sub.list_id IS NOT NULL
UNION ALL
SELECT sub.id, 'item' AS kind, sub.created_at,
l.id AS list_id, i.id AS item_id,
COALESCE(i.title_fetched, i.title) AS title, l.emoji, l.share_token,
i.url, i.image_url,
i.current_price, i.currency,
i.in_stock, i.target_price
FROM subscriptions sub
JOIN items i ON i.id = sub.item_id
JOIN lists l ON l.id = i.list_id
WHERE sub.user_id = $1 AND sub.item_id IS NOT NULL
ORDER BY created_at DESC",
)
.bind(user.id)
.fetch_all(&state.pool)
.await?;
Ok(Json(subs))
}
#[derive(Debug, Deserialize)]
struct SubscribeReq {
list_id: Option<Uuid>,
item_id: Option<Uuid>,
}
async fn subscribe(
State(state): State<AppState>,
AuthUser(user): AuthUser,
Json(req): Json<SubscribeReq>,
) -> AppResult<Json<serde_json::Value>> {
let id = match (req.list_id, req.item_id) {
(Some(list_id), None) => subscribe_list(&state, user.id, list_id).await?,
(None, Some(item_id)) => subscribe_item(&state, user.id, item_id).await?,
_ => {
return Err(AppError::Validation(
"provide exactly one of list_id or item_id".into(),
))
}
};
Ok(Json(serde_json::json!({ "id": id })))
}
async fn subscribe_list(state: &AppState, user_id: Uuid, list_id: Uuid) -> AppResult<Uuid> {
let ok = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM lists
WHERE id = $1 AND share_token IS NOT NULL AND user_id <> $2)",
)
.bind(list_id)
.bind(user_id)
.fetch_one(&state.pool)
.await?;
if !ok {
return Err(AppError::NotFound);
}
// Idempotent: the partial unique index guards against duplicates.
sqlx::query(
"INSERT INTO subscriptions (user_id, list_id) SELECT $1, $2
WHERE NOT EXISTS (SELECT 1 FROM subscriptions WHERE user_id = $1 AND list_id = $2)",
)
.bind(user_id)
.bind(list_id)
.execute(&state.pool)
.await?;
let id = sqlx::query_scalar::<_, Uuid>(
"SELECT id FROM subscriptions WHERE user_id = $1 AND list_id = $2",
)
.bind(user_id)
.bind(list_id)
.fetch_one(&state.pool)
.await?;
Ok(id)
}
async fn subscribe_item(state: &AppState, user_id: Uuid, item_id: Uuid) -> AppResult<Uuid> {
let ok = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM items i JOIN lists l ON l.id = i.list_id
WHERE i.id = $1 AND l.share_token IS NOT NULL AND l.user_id <> $2)",
)
.bind(item_id)
.bind(user_id)
.fetch_one(&state.pool)
.await?;
if !ok {
return Err(AppError::NotFound);
}
sqlx::query(
"INSERT INTO subscriptions (user_id, item_id) SELECT $1, $2
WHERE NOT EXISTS (SELECT 1 FROM subscriptions WHERE user_id = $1 AND item_id = $2)",
)
.bind(user_id)
.bind(item_id)
.execute(&state.pool)
.await?;
let id = sqlx::query_scalar::<_, Uuid>(
"SELECT id FROM subscriptions WHERE user_id = $1 AND item_id = $2",
)
.bind(user_id)
.bind(item_id)
.fetch_one(&state.pool)
.await?;
Ok(id)
}
async fn unsubscribe(
State(state): State<AppState>,
AuthUser(user): AuthUser,
Path(id): Path<Uuid>,
) -> AppResult<Json<serde_json::Value>> {
let res = sqlx::query("DELETE FROM subscriptions WHERE id = $1 AND user_id = $2")
.bind(id)
.bind(user.id)
.execute(&state.pool)
.await?;
if res.rows_affected() == 0 {
return Err(AppError::NotFound);
}
Ok(Json(serde_json::json!({ "deleted": id })))
}
+1
View File
@@ -70,6 +70,7 @@ pub async fn refetch(state: &AppState, item_id: Uuid, url: &str) -> anyhow::Resu
Ok(p) => {
apply_success(&state.pool, item_id, &p).await?;
notify::maybe_notify_drop(state, item_id).await;
notify::maybe_notify_subscribers(state, item_id).await;
Ok(())
}
Err(e) => {