161 lines
4.8 KiB
Rust
161 lines
4.8 KiB
Rust
use crate::user::AuthenticatedUser;
|
|
use crate::{feed_utils::fetch_feed, Db};
|
|
use chrono::{DateTime, Duration, Utc};
|
|
use feed_rs::model::Text;
|
|
use rocket::http::Status;
|
|
use rocket::serde::uuid::Uuid;
|
|
use rocket::serde::{self, json::Json, Serialize};
|
|
use rocket_db_pools::Connection;
|
|
use sqlx::{Acquire, SqliteConnection};
|
|
use url::Url;
|
|
|
|
const POLLING_INTERVAL: Duration = Duration::minutes(20);
|
|
|
|
#[derive(Debug, Serialize)]
|
|
#[serde(crate = "rocket::serde")]
|
|
pub struct FeedPollResponse {
|
|
count: usize,
|
|
entries: Vec<Entry>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
#[serde(crate = "rocket::serde")]
|
|
struct Entry {
|
|
id: Uuid,
|
|
entry_id: String,
|
|
title: String,
|
|
published: Option<DateTime<Utc>>,
|
|
updated: Option<DateTime<Utc>>,
|
|
summary: String,
|
|
content: Option<feed_rs::model::Content>,
|
|
link: Option<String>,
|
|
}
|
|
|
|
async fn update_entry_db(
|
|
entries: &Vec<Entry>,
|
|
feed_id: &str,
|
|
db: &mut SqliteConnection,
|
|
) -> Result<(), Status> {
|
|
// Start a transaction for batch update
|
|
let mut tx = db.begin().await.map_err(|e| {
|
|
eprintln!("Failed to start transaction: {}", e);
|
|
Status::InternalServerError
|
|
})?;
|
|
|
|
let now = Utc::now().to_rfc3339();
|
|
for entry in entries {
|
|
let content_json = if let Some(content) = &entry.content {
|
|
serde::json::to_string(content).ok()
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let result = sqlx::query(
|
|
r#"
|
|
INSERT INTO feed_entries (
|
|
id, feed_id, entry_id, title, published, updated, summary, content, link, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT (feed_id, id) DO UPDATE SET
|
|
title = excluded.title,
|
|
published = excluded.published,
|
|
updated = excluded.updated,
|
|
summary = excluded.summary,
|
|
content = excluded.content,
|
|
link = excluded.link
|
|
"#,
|
|
)
|
|
.bind(&entry.id)
|
|
.bind(feed_id)
|
|
.bind(&entry.entry_id)
|
|
.bind(&entry.title)
|
|
.bind(entry.published.map(|dt| dt.to_rfc3339()))
|
|
.bind(entry.updated.map(|dt| dt.to_rfc3339()))
|
|
.bind(&entry.summary)
|
|
.bind(content_json)
|
|
.bind(&entry.link)
|
|
.bind(&now)
|
|
.execute(&mut *tx)
|
|
.await;
|
|
|
|
if let Err(e) = result {
|
|
eprintln!("Failed to save feed entry: {}", e);
|
|
tx.rollback().await.map_err(|e| {
|
|
eprintln!("Failed to rollback transaction: {}", e);
|
|
Status::InternalServerError
|
|
})?;
|
|
return Err(Status::InternalServerError);
|
|
}
|
|
}
|
|
|
|
// Commit the transaction
|
|
tx.commit().await.map_err(|e| {
|
|
eprintln!("Failed to commit transaction: {}", e);
|
|
Status::InternalServerError
|
|
})?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Perform the request to fetch from the remote feed url
|
|
async fn fetch_new_entries(url: &Url) -> Result<Vec<Entry>, Status> {
|
|
let feed_data = fetch_feed(url).await.map_err(|_| Status::BadGateway)?;
|
|
|
|
fn get(item: Option<Text>, name: &'static str) -> String {
|
|
item.map(|t| t.content.to_string())
|
|
.unwrap_or(format!("<no {name}>"))
|
|
}
|
|
|
|
let entries: Vec<Entry> = feed_data
|
|
.entries
|
|
.into_iter()
|
|
.map(|feed_entry| Entry {
|
|
id: Uuid::new_v4(),
|
|
entry_id: feed_entry.id,
|
|
title: get(feed_entry.title, "title"),
|
|
published: feed_entry.published,
|
|
updated: feed_entry.updated,
|
|
summary: get(feed_entry.summary, "summary"),
|
|
content: feed_entry.content,
|
|
link: feed_entry.links.first().map(|l| l.href.clone()),
|
|
})
|
|
.collect();
|
|
Ok(entries)
|
|
}
|
|
|
|
#[post("/poll/<feed_id>")]
|
|
pub async fn poll_feed(
|
|
mut db: Connection<Db>,
|
|
feed_id: Uuid,
|
|
user: AuthenticatedUser,
|
|
) -> Result<Json<FeedPollResponse>, Status> {
|
|
let feed_id = feed_id.to_string();
|
|
let user_id = user.user_id.to_string();
|
|
|
|
// Get the feed URL from the database, ensuring it belongs to the authenticated user
|
|
let feed = sqlx::query!(
|
|
r#"SELECT url, last_checked_time as "last_checked_time: chrono::DateTime<chrono::Utc>" FROM feeds WHERE feed_id = ? AND user_id = ?"#,
|
|
feed_id,
|
|
user_id
|
|
)
|
|
.fetch_optional(&mut **db)
|
|
.await
|
|
.map_err(|_| Status::InternalServerError)?
|
|
.ok_or(Status::NotFound)?;
|
|
|
|
let url = url::Url::parse(&feed.url).map_err(|_| Status::InternalServerError)?;
|
|
|
|
let now = Utc::now();
|
|
if now - feed.last_checked_time < POLLING_INTERVAL {
|
|
println!(
|
|
"Feed {} was checked recently at {}",
|
|
feed_id, feed.last_checked_time
|
|
);
|
|
}
|
|
|
|
let entries = fetch_new_entries(&url).await?;
|
|
let count = entries.len();
|
|
update_entry_db(&entries, &feed_id, &mut db).await?;
|
|
|
|
Ok(Json(FeedPollResponse { count, entries }))
|
|
}
|