Compare commits

...

3 Commits

Author SHA1 Message Date
Greg Shuflin
d7882b0291 Pull in feeds on demo startup 2025-02-05 22:39:37 -08:00
Greg Shuflin
08994856c3 write demo feeds to db 2025-02-05 21:36:42 -08:00
Greg Shuflin
89982755a0 Fetch demo feeds 2025-02-05 21:00:51 -08:00
6 changed files with 64 additions and 20 deletions

View File

@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n id as \"id!: String\",\n local_id as \"local_id!: String\",\n title,\n published as \"published: Option<chrono::DateTime<Utc>>\",\n updated as \"updated: Option<chrono::DateTime<Utc>>\",\n summary,\n content,\n link,\n marked_read as \"marked_read: Option<chrono::DateTime<Utc>>\"\n FROM feed_entries \n WHERE feed_id = ?\n ORDER BY published DESC NULLS LAST\n LIMIT ?\n ",
"query": "\n SELECT \n id as \"id!: String\",\n local_id as \"local_id!: String\",\n title,\n published as \"published: Option<chrono::DateTime<Utc>>\",\n updated as \"updated: Option<chrono::DateTime<Utc>>\",\n summary,\n content,\n link,\n marked_read as \"marked_read: Option<chrono::DateTime<Utc>>\"\n FROM feed_entries\n WHERE feed_id = ?\n ORDER BY published DESC NULLS LAST\n LIMIT ?\n ",
"describe": {
"columns": [
{
@ -64,5 +64,5 @@
true
]
},
"hash": "2bbed6f20243ced25ac9359afefafb5ddffdff949250e0e4e35bf399fc0199fc"
"hash": "6186c55b12f42931e31a9f8367d6aa2bd637091ca9e1b8d9459241ca9488882c"
}

14
Cargo.lock generated
View File

@ -923,6 +923,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -973,6 +974,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.96",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
@ -994,6 +1006,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@ -2668,6 +2681,7 @@ dependencies = [
"chrono",
"clap",
"feed-rs",
"futures",
"getrandom 0.2.15",
"reqwest",
"rocket",

View File

@ -24,3 +24,4 @@ base64 = "0.21"
getrandom = "0.2"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
futures = "0.3.31"

View File

@ -1,6 +1,7 @@
use crate::feeds::Feed;
use crate::poll_utils::fetch_new_entries;
use crate::poll_utils::{fetch_new_entries, update_entry_db};
use crate::user::User;
use futures::future::join_all;
use tracing::{info, warn};
struct DemoFeed {
@ -9,7 +10,7 @@ struct DemoFeed {
category: Option<&'static str>,
}
const DEMO_FEEDS: [DemoFeed; 5] = [
const DEMO_FEEDS: [DemoFeed; 6] = [
DemoFeed {
name: "XKCD",
url: "https://xkcd.com/atom.xml",
@ -30,6 +31,11 @@ const DEMO_FEEDS: [DemoFeed; 5] = [
url: "https://feeds.bbci.co.uk/news/world/us_and_canada/rss.xml",
category: Some("News"),
},
DemoFeed {
name: "Lines and Colors",
url: "https://linesandcolors.com/feed/",
category: None,
},
DemoFeed {
name: "Astronomy Picture of the Day (APOD)",
url: "https://apod.nasa.gov/apod.rss",
@ -83,17 +89,36 @@ pub async fn setup_demo_data(pool: &sqlx::SqlitePool) {
.expect("Failed to create demo feed");
}
for feed in feeds.iter() {
let url = &feed.url;
// Fetch all feeds in parallel
let fetch_futures: Vec<_> = feeds
.iter()
.map(|feed| {
let url = feed.url.clone();
let feed_id = feed.feed_id;
async move {
let result = fetch_new_entries(&url).await;
(feed_id, url, result)
}
})
.collect();
let entries = match fetch_new_entries(url).await {
Ok(entries) => entries,
let results = join_all(fetch_futures).await;
for (feed_id, url, result) in results {
match result {
Ok(entries) => {
info!(
feed_url = url.as_str(),
"Successfully fetched {} entries",
entries.len()
);
if let Err(e) = update_entry_db(&entries, &feed_id, pool).await {
warn!(error=%e, feed_url=url.as_str(), "Failed to store entries in database");
}
}
Err(e) => {
warn!(error=%e, feed_url=url.as_str(), "Error populating feed");
continue;
}
};
//update_entry_db(&entries, &feed_id, &mut db).await?;
}
}
info!("Successfully set up demo data");

View File

@ -27,6 +27,7 @@ pub struct EntryStateUpdate {
}
async fn read_entries(feed_id: &Uuid, db: &mut SqliteConnection) -> Result<Vec<Entry>, Status> {
let feed_id_str = feed_id.to_string();
let rows = sqlx::query!(
r#"
SELECT
@ -39,12 +40,12 @@ async fn read_entries(feed_id: &Uuid, db: &mut SqliteConnection) -> Result<Vec<E
content,
link,
marked_read as "marked_read: Option<chrono::DateTime<Utc>>"
FROM feed_entries
FROM feed_entries
WHERE feed_id = ?
ORDER BY published DESC NULLS LAST
LIMIT ?
"#,
feed_id,
feed_id_str,
MAX_ENTRIES_PER_FEED,
)
.fetch_all(db)
@ -107,11 +108,11 @@ pub async fn poll_feed(
info!("Feed {} last checked: {}", feed_id, last_checked);
let entries = if last_checked < POLLING_INTERVAL {
info!("Reading entries from database for feed {}", feed_id);
read_entries(&feed_id, &mut db).await?
read_entries(&feed_id, &mut **db).await?
} else {
info!("Fetching new entries for feed {}", feed_id);
let entries = crate::poll_utils::fetch_new_entries(&url).await?;
update_entry_db(&entries, &feed_id, &mut db).await?;
update_entry_db(&entries, &feed_id, &mut **db).await?;
entries
};

View File

@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
use feed_rs::model::Text;
use rocket::http::Status;
use rocket::serde::{json, uuid::Uuid, Serialize};
use sqlx::{Acquire, SqliteConnection};
use sqlx::{Acquire, Executor};
use url::Url;
#[derive(Debug, Serialize)]
@ -50,13 +50,16 @@ pub async fn fetch_new_entries(url: &Url) -> Result<Vec<Entry>, Status> {
Ok(entries)
}
pub async fn update_entry_db(
pub async fn update_entry_db<'a, E>(
entries: &Vec<Entry>,
feed_id: &Uuid,
db: &mut SqliteConnection,
) -> Result<(), Status> {
executor: E,
) -> Result<(), Status>
where
E: Executor<'a, Database = sqlx::Sqlite> + Acquire<'a, Database = sqlx::Sqlite>,
{
// Start a transaction for batch update
let mut tx = db.begin().await.map_err(|e| {
let mut tx = executor.begin().await.map_err(|e| {
error!("Failed to start transaction: {}", e);
Status::InternalServerError
})?;