Compare commits
3 Commits
c8301d14e1
...
d7882b0291
Author | SHA1 | Date | |
---|---|---|---|
|
d7882b0291 | ||
|
08994856c3 | ||
|
89982755a0 |
@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "SQLite",
|
||||
"query": "\n SELECT \n id as \"id!: String\",\n local_id as \"local_id!: String\",\n title,\n published as \"published: Option<chrono::DateTime<Utc>>\",\n updated as \"updated: Option<chrono::DateTime<Utc>>\",\n summary,\n content,\n link,\n marked_read as \"marked_read: Option<chrono::DateTime<Utc>>\"\n FROM feed_entries \n WHERE feed_id = ?\n ORDER BY published DESC NULLS LAST\n LIMIT ?\n ",
|
||||
"query": "\n SELECT \n id as \"id!: String\",\n local_id as \"local_id!: String\",\n title,\n published as \"published: Option<chrono::DateTime<Utc>>\",\n updated as \"updated: Option<chrono::DateTime<Utc>>\",\n summary,\n content,\n link,\n marked_read as \"marked_read: Option<chrono::DateTime<Utc>>\"\n FROM feed_entries\n WHERE feed_id = ?\n ORDER BY published DESC NULLS LAST\n LIMIT ?\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@ -64,5 +64,5 @@
|
||||
true
|
||||
]
|
||||
},
|
||||
"hash": "2bbed6f20243ced25ac9359afefafb5ddffdff949250e0e4e35bf399fc0199fc"
|
||||
"hash": "6186c55b12f42931e31a9f8367d6aa2bd637091ca9e1b8d9459241ca9488882c"
|
||||
}
|
14
Cargo.lock
generated
14
Cargo.lock
generated
@ -923,6 +923,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
@ -973,6 +974,17 @@ version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.96",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.31"
|
||||
@ -994,6 +1006,7 @@ dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"memchr",
|
||||
@ -2668,6 +2681,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"feed-rs",
|
||||
"futures",
|
||||
"getrandom 0.2.15",
|
||||
"reqwest",
|
||||
"rocket",
|
||||
|
@ -24,3 +24,4 @@ base64 = "0.21"
|
||||
getrandom = "0.2"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
futures = "0.3.31"
|
||||
|
43
src/demo.rs
43
src/demo.rs
@ -1,6 +1,7 @@
|
||||
use crate::feeds::Feed;
|
||||
use crate::poll_utils::fetch_new_entries;
|
||||
use crate::poll_utils::{fetch_new_entries, update_entry_db};
|
||||
use crate::user::User;
|
||||
use futures::future::join_all;
|
||||
use tracing::{info, warn};
|
||||
|
||||
struct DemoFeed {
|
||||
@ -9,7 +10,7 @@ struct DemoFeed {
|
||||
category: Option<&'static str>,
|
||||
}
|
||||
|
||||
const DEMO_FEEDS: [DemoFeed; 5] = [
|
||||
const DEMO_FEEDS: [DemoFeed; 6] = [
|
||||
DemoFeed {
|
||||
name: "XKCD",
|
||||
url: "https://xkcd.com/atom.xml",
|
||||
@ -30,6 +31,11 @@ const DEMO_FEEDS: [DemoFeed; 5] = [
|
||||
url: "https://feeds.bbci.co.uk/news/world/us_and_canada/rss.xml",
|
||||
category: Some("News"),
|
||||
},
|
||||
DemoFeed {
|
||||
name: "Lines and Colors",
|
||||
url: "https://linesandcolors.com/feed/",
|
||||
category: None,
|
||||
},
|
||||
DemoFeed {
|
||||
name: "Astronomy Picture of the Day (APOD)",
|
||||
url: "https://apod.nasa.gov/apod.rss",
|
||||
@ -83,17 +89,36 @@ pub async fn setup_demo_data(pool: &sqlx::SqlitePool) {
|
||||
.expect("Failed to create demo feed");
|
||||
}
|
||||
|
||||
for feed in feeds.iter() {
|
||||
let url = &feed.url;
|
||||
// Fetch all feeds in parallel
|
||||
let fetch_futures: Vec<_> = feeds
|
||||
.iter()
|
||||
.map(|feed| {
|
||||
let url = feed.url.clone();
|
||||
let feed_id = feed.feed_id;
|
||||
async move {
|
||||
let result = fetch_new_entries(&url).await;
|
||||
(feed_id, url, result)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let entries = match fetch_new_entries(url).await {
|
||||
Ok(entries) => entries,
|
||||
let results = join_all(fetch_futures).await;
|
||||
for (feed_id, url, result) in results {
|
||||
match result {
|
||||
Ok(entries) => {
|
||||
info!(
|
||||
feed_url = url.as_str(),
|
||||
"Successfully fetched {} entries",
|
||||
entries.len()
|
||||
);
|
||||
if let Err(e) = update_entry_db(&entries, &feed_id, pool).await {
|
||||
warn!(error=%e, feed_url=url.as_str(), "Failed to store entries in database");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error=%e, feed_url=url.as_str(), "Error populating feed");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
//update_entry_db(&entries, &feed_id, &mut db).await?;
|
||||
}
|
||||
}
|
||||
|
||||
info!("Successfully set up demo data");
|
||||
|
@ -27,6 +27,7 @@ pub struct EntryStateUpdate {
|
||||
}
|
||||
|
||||
async fn read_entries(feed_id: &Uuid, db: &mut SqliteConnection) -> Result<Vec<Entry>, Status> {
|
||||
let feed_id_str = feed_id.to_string();
|
||||
let rows = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
@ -39,12 +40,12 @@ async fn read_entries(feed_id: &Uuid, db: &mut SqliteConnection) -> Result<Vec<E
|
||||
content,
|
||||
link,
|
||||
marked_read as "marked_read: Option<chrono::DateTime<Utc>>"
|
||||
FROM feed_entries
|
||||
FROM feed_entries
|
||||
WHERE feed_id = ?
|
||||
ORDER BY published DESC NULLS LAST
|
||||
LIMIT ?
|
||||
"#,
|
||||
feed_id,
|
||||
feed_id_str,
|
||||
MAX_ENTRIES_PER_FEED,
|
||||
)
|
||||
.fetch_all(db)
|
||||
@ -107,11 +108,11 @@ pub async fn poll_feed(
|
||||
info!("Feed {} last checked: {}", feed_id, last_checked);
|
||||
let entries = if last_checked < POLLING_INTERVAL {
|
||||
info!("Reading entries from database for feed {}", feed_id);
|
||||
read_entries(&feed_id, &mut db).await?
|
||||
read_entries(&feed_id, &mut **db).await?
|
||||
} else {
|
||||
info!("Fetching new entries for feed {}", feed_id);
|
||||
let entries = crate::poll_utils::fetch_new_entries(&url).await?;
|
||||
update_entry_db(&entries, &feed_id, &mut db).await?;
|
||||
update_entry_db(&entries, &feed_id, &mut **db).await?;
|
||||
entries
|
||||
};
|
||||
|
||||
|
@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
|
||||
use feed_rs::model::Text;
|
||||
use rocket::http::Status;
|
||||
use rocket::serde::{json, uuid::Uuid, Serialize};
|
||||
use sqlx::{Acquire, SqliteConnection};
|
||||
use sqlx::{Acquire, Executor};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@ -50,13 +50,16 @@ pub async fn fetch_new_entries(url: &Url) -> Result<Vec<Entry>, Status> {
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
pub async fn update_entry_db(
|
||||
pub async fn update_entry_db<'a, E>(
|
||||
entries: &Vec<Entry>,
|
||||
feed_id: &Uuid,
|
||||
db: &mut SqliteConnection,
|
||||
) -> Result<(), Status> {
|
||||
executor: E,
|
||||
) -> Result<(), Status>
|
||||
where
|
||||
E: Executor<'a, Database = sqlx::Sqlite> + Acquire<'a, Database = sqlx::Sqlite>,
|
||||
{
|
||||
// Start a transaction for batch update
|
||||
let mut tx = db.begin().await.map_err(|e| {
|
||||
let mut tx = executor.begin().await.map_err(|e| {
|
||||
error!("Failed to start transaction: {}", e);
|
||||
Status::InternalServerError
|
||||
})?;
|
||||
|
Loading…
x
Reference in New Issue
Block a user