Skip to content

Commit

Permalink
Use pooled db connections for osm sync
Browse files Browse the repository at this point in the history
  • Loading branch information
bubelov committed Mar 10, 2025
1 parent ac410d5 commit d97967c
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 114 deletions.
8 changes: 8 additions & 0 deletions src/area_element/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::model::AreaElement;
use crate::{area::Area, element::Element};
use crate::{element, Result};
use deadpool_sqlite::Pool;
use geo::{Contains, LineString, MultiPolygon, Polygon};
use geojson::Geometry;
use rusqlite::Connection;
Expand All @@ -15,6 +16,13 @@ pub struct Diff {
pub removed_areas: Vec<i64>,
}

pub async fn generate_mapping_async(elements: Vec<Element>, pool: &Pool) -> Result<Vec<Diff>> {
pool.get()
.await?
.interact(move |conn| generate_mapping(&elements, conn))
.await?
}

pub fn generate_mapping(elements: &[Element], conn: &Connection) -> Result<Vec<Diff>> {
let mut diffs = vec![];
let areas = Area::select_all(conn)?;
Expand Down
6 changes: 0 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ pub fn pool() -> Result<Pool> {
.build()?)
}

pub fn open_connection() -> Result<Connection> {
let conn = Connection::open(data_dir_file("btcmap.db")?)?;
init_pragmas(&conn);
Ok(conn)
}

pub fn data_dir_file(name: &str) -> Result<PathBuf> {
#[allow(deprecated)]
let data_dir = std::env::home_dir()
Expand Down
51 changes: 40 additions & 11 deletions src/element/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ const COL_UPDATED_AT: &str = "updated_at";
const COL_DELETED_AT: &str = "deleted_at";

impl Element {
pub async fn insert_async(overpass_data: OverpassElement, pool: &Pool) -> Result<Self> {
pool.get()
.await?
.interact(move |conn| Self::insert(&overpass_data, conn))
.await?
}

pub fn insert(overpass_data: &OverpassElement, conn: &Connection) -> Result<Element> {
let sql = format!(
r#"
Expand Down Expand Up @@ -272,8 +279,19 @@ impl Element {
.ok_or(Error::Rusqlite(rusqlite::Error::QueryReturnedNoRows))
}

pub async fn set_overpass_data_async(
id: i64,
overpass_data: OverpassElement,
pool: &Pool,
) -> Result<Self> {
pool.get()
.await?
.interact(move |conn| Self::set_overpass_data(id, &overpass_data, conn))
.await?
}

pub fn set_overpass_data(
&self,
id: i64,
overpass_data: &OverpassElement,
conn: &Connection,
) -> Result<Element> {
Expand All @@ -287,11 +305,11 @@ impl Element {
conn.execute(
&sql,
named_params! {
":id": self.id,
":id": id,
":overpass_data": serde_json::to_string(overpass_data)?,
},
)?;
Element::select_by_id(self.id, conn)?
Element::select_by_id(id, conn)?
.ok_or(Error::Rusqlite(rusqlite::Error::QueryReturnedNoRows))
}

Expand Down Expand Up @@ -379,8 +397,19 @@ impl Element {
.ok_or(Error::Rusqlite(rusqlite::Error::QueryReturnedNoRows))
}

pub async fn set_deleted_at_async(
id: i64,
deleted_at: Option<OffsetDateTime>,
pool: &Pool,
) -> Result<Element> {
pool.get()
.await?
.interact(move |conn| Self::set_deleted_at(id, deleted_at, conn))
.await?
}

pub fn set_deleted_at(
&self,
id: i64,
deleted_at: Option<OffsetDateTime>,
conn: &Connection,
) -> Result<Element> {
Expand All @@ -396,7 +425,7 @@ impl Element {
conn.execute(
&sql,
named_params! {
":id": self.id,
":id": id,
":deleted_at": deleted_at.format(&Rfc3339)?,
},
)?;
Expand All @@ -409,10 +438,10 @@ impl Element {
WHERE {COL_ROWID} = :id
"#
);
conn.execute(&sql, named_params! { ":id": self.id })?;
conn.execute(&sql, named_params! { ":id": id })?;
}
};
Element::select_by_id(self.id, conn)?
Element::select_by_id(id, conn)?
.ok_or(Error::Rusqlite(rusqlite::Error::QueryReturnedNoRows))
}

Expand Down Expand Up @@ -556,8 +585,8 @@ mod test {
let conn = mock_conn();
let orig_data = OverpassElement::mock(1);
let override_data = OverpassElement::mock(2);
let element =
Element::insert(&orig_data, &conn)?.set_overpass_data(&override_data, &conn)?;
let element = Element::insert(&orig_data, &conn)?;
let element = Element::set_overpass_data(element.id, &override_data, &conn)?;
assert_eq!(override_data, element.overpass_data);
Ok(())
}
Expand Down Expand Up @@ -603,8 +632,8 @@ mod test {
fn set_deleted_at() -> Result<()> {
let conn = mock_conn();
let deleted_at = OffsetDateTime::now_utc();
let element = Element::insert(&OverpassElement::mock(1), &conn)?
.set_deleted_at(Some(deleted_at), &conn)?;
let element = Element::insert(&OverpassElement::mock(1), &conn)?;
let element = Element::set_deleted_at(element.id, Some(deleted_at), &conn)?;
assert_eq!(
deleted_at,
Element::select_by_id(element.id, &conn)?
Expand Down
34 changes: 29 additions & 5 deletions src/event/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ const COL_UPDATED_AT: &str = "updated_at";
const COL_DELETED_AT: &str = "deleted_at";

impl Event {
pub async fn insert_async(
user_id: i64,
element_id: i64,
r#type: &str,
pool: &Pool,
) -> Result<Self> {
let r#type = r#type.to_string();
pool.get()
.await?
.interact(move |conn| Self::insert(user_id, element_id, &r#type, conn))
.await?
}

pub fn insert(user_id: i64, element_id: i64, r#type: &str, conn: &Connection) -> Result<Event> {
let sql = format!(
r#"
Expand Down Expand Up @@ -272,8 +285,19 @@ impl Event {
.optional()?)
}

pub fn patch_tags(&self, tags: &HashMap<String, Value>, conn: &Connection) -> Result<Event> {
Event::_patch_tags(self.id, tags, conn)
pub async fn patch_tags_async(
id: i64,
tags: HashMap<String, Value>,
pool: &Pool,
) -> Result<Self> {
pool.get()
.await?
.interact(move |conn| Self::patch_tags(id, &tags, conn))
.await?
}

pub fn patch_tags(id: i64, tags: &HashMap<String, Value>, conn: &Connection) -> Result<Event> {
Event::_patch_tags(id, tags, conn)
}

pub fn _patch_tags(id: i64, tags: &HashMap<String, Value>, conn: &Connection) -> Result<Event> {
Expand Down Expand Up @@ -423,14 +447,14 @@ mod test {
let event = Event::insert(user.id, element.id, "", &conn)?;
let mut tags = HashMap::new();
tags.insert(tag_1_name.into(), tag_1_value_1.clone());
let event = event.patch_tags(&tags, &conn)?;
let event = Event::patch_tags(event.id, &tags, &conn)?;
assert_eq!(&tag_1_value_1, event.tag(tag_1_name));
tags.insert(tag_1_name.into(), tag_1_value_2.clone());
let event = event.patch_tags(&tags, &conn)?;
let event = Event::patch_tags(event.id, &tags, &conn)?;
assert_eq!(&tag_1_value_2, event.tag(tag_1_name));
tags.clear();
tags.insert(tag_2_name.into(), tag_2_value.clone());
let event = event.patch_tags(&tags, &conn)?;
let event = Event::patch_tags(event.id, &tags, &conn)?;
assert!(event.tags.contains_key(tag_1_name));
assert_eq!(&tag_2_value, event.tag(tag_2_name));
Ok(())
Expand Down
19 changes: 11 additions & 8 deletions src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::osm;
use crate::user;
use crate::user::User;
use crate::Result;
use rusqlite::Connection;
use deadpool_sqlite::Pool;
use serde_json::Value;
use std::ops::Add;
use time::format_description::well_known::Rfc3339;
Expand All @@ -14,9 +14,11 @@ use tracing::error;
use tracing::info;
use tracing::warn;

pub async fn on_new_event(event: &Event, conn: &Connection) -> Result<()> {
user::service::insert_user_if_not_exists(event.user_id, conn).await?;
let user = User::select_by_id(event.user_id, conn)?.unwrap();
pub async fn on_new_event(event: &Event, pool: &Pool) -> Result<()> {
user::service::insert_user_if_not_exists(event.user_id, pool).await?;
let user = User::select_by_id_async(event.user_id, pool)
.await?
.unwrap();

let message = match event.r#type.as_str() {
"create" => format!(
Expand All @@ -34,7 +36,7 @@ pub async fn on_new_event(event: &Event, conn: &Connection) -> Result<()> {
_ => "".into(),
};
info!(message);
let conf = Conf::select(conn)?;
let conf = Conf::select_async(pool).await?;
discord::post_message(conf.discord_webhook_osm_changes, message).await;

if user.tags.get("osm:missing") == Some(&Value::Bool(true)) {
Expand Down Expand Up @@ -66,18 +68,19 @@ pub async fn on_new_event(event: &Event, conn: &Connection) -> Result<()> {
new_osm_data = serde_json::to_string(&new_osm_data)?,
"User data changed",
);
User::set_osm_data(user.id, &new_osm_data, conn)?;
User::set_osm_data_async(user.id, new_osm_data, pool).await?;
} else {
info!("User data didn't change")
}

let now = OffsetDateTime::now_utc();
let now: String = now.format(&Rfc3339)?;
User::set_tag(user.id, "osm:sync:date", &Value::String(now), conn)?;
User::set_tag_async(user.id, "osm:sync:date".into(), Value::String(now), pool)
.await?;
}
None => {
warn!(user.osm_data.id, "User no longer exists on OSM");
User::set_tag(user.id, "osm:missing", &Value::Bool(true), conn)?;
User::set_tag_async(user.id, "osm:missing".into(), Value::Bool(true), pool).await?;
}
},
Err(e) => error!("Failed to fetch user {} {}", user.osm_data.id, e),
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ pub async fn handle(
),
RpcMethod::SyncElements => RpcResponse::from(
req.id.clone(),
super::sync_elements::run(&admin.unwrap(), &conf).await?,
super::sync_elements::run(&admin.unwrap(), &pool, &conf).await?,
),
RpcMethod::GenerateElementIcons => RpcResponse::from(
req.id.clone(),
Expand Down
8 changes: 4 additions & 4 deletions src/rpc/sync_elements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::admin::Admin;
use crate::conf::Conf;
use crate::osm::overpass;
use crate::sync::MergeResult;
use crate::{db, discord, sync, Result};
use crate::{discord, sync, Result};
use deadpool_sqlite::Pool;
use serde::Serialize;

#[derive(Serialize)]
Expand All @@ -12,11 +13,10 @@ pub struct Res {
pub merge_result: MergeResult,
}

pub async fn run(admin: &Admin, conf: &Conf) -> Result<Res> {
pub async fn run(admin: &Admin, pool: &Pool, conf: &Conf) -> Result<Res> {
let overpass_res = overpass::query_bitcoin_merchants().await?;
let overpass_elements_len = overpass_res.elements.len();
let mut conn = db::open_connection()?;
let merge_res = sync::merge_overpass_elements(overpass_res.elements, &mut conn).await?;
let merge_res = sync::merge_overpass_elements(overpass_res.elements, &pool).await?;
if merge_res.elements_created.len()
+ merge_res.elements_updated.len()
+ merge_res.elements_deleted.len()
Expand Down
Loading

0 comments on commit d97967c

Please sign in to comment.