From bef8c7d0517b1122d6f8f54f8e355183eef955c3 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Fri, 19 Jul 2024 00:06:39 +0530 Subject: [PATCH 01/16] init Signed-off-by: pushkarm029 --- fs-storage/src/folder_storage.rs | 450 +++++++++++++++++++++++++++++++ fs-storage/src/lib.rs | 1 + 2 files changed, 451 insertions(+) create mode 100644 fs-storage/src/folder_storage.rs diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs new file mode 100644 index 00000000..9e17351f --- /dev/null +++ b/fs-storage/src/folder_storage.rs @@ -0,0 +1,450 @@ +use serde::{Deserialize, Serialize}; +use std::fs::{self, File}; +use std::io::{Read, Write}; +use std::time::SystemTime; + +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, +}; + +use crate::base_storage::{BaseStorage, SyncStatus}; +use crate::monoid::Monoid; +// use crate::utils::read_version_2_fs; +use data_error::{ArklibError, Result}; + +const MAX_ENTRIES_PER_FILE: usize = 1000; +const STORAGE_VERSION: i32 = 1; + +pub struct FolderStorage +where + K: Ord, +{ + /// Label for logging + label: String, + /// Path to the underlying file where data is persisted + path: PathBuf, + /// Tracks the last known modification time of each file in memory. + /// This becomes equal to `last_disk_updated` only when data is written or read from disk. + disk_timestamps: BTreeMap, + /// Tracks the last known modification time of each file on disk. + /// This becomes equal to `last_ram_updated` only when data is written or read from disk. + ram_timestamps: BTreeMap, + current_file_index: usize, + current_file_entries: usize, + // Maps keys to file indices + index: BTreeMap, + data: FolderStorageData, +} + +/// A struct that represents the data stored in a [`FolderStorage`] instance. +/// +/// +/// This is the data that is serialized and deserialized to and from disk. +#[derive(Serialize, Deserialize)] +pub struct FolderStorageData +where + K: Ord, +{ + version: i32, + entries: BTreeMap, +} + +impl AsRef> for FolderStorageData +where + K: Ord, +{ + fn as_ref(&self) -> &BTreeMap { + &self.entries + } +} + +impl FolderStorage +where + K: Ord + + Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::str::FromStr + + std::fmt::Debug, + V: Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::str::FromStr + + Monoid, +{ + pub fn new(label: String, path: &Path) -> Result { + let mut storage = Self { + label, + path: path.to_path_buf(), + disk_timestamps: BTreeMap::new(), + ram_timestamps: BTreeMap::new(), + current_file_index: 0, + current_file_entries: 0, + index: BTreeMap::new(), + data: FolderStorageData { + version: STORAGE_VERSION, + entries: BTreeMap::new(), + }, + }; + storage.load_index()?; + + if Path::exists(path) { + storage.read_fs()?; + } + + Ok(storage) + } + + fn load_index(&mut self) -> Result<()> { + let index_path: PathBuf = self.path.join("index.json"); + if index_path.exists() { + let mut file = File::open(index_path)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + self.index = serde_json::from_str(&contents)?; + self.current_file_index = + *self.index.values().max().unwrap_or(&0) + 1; // Correct? + } + Ok(()) + } + + fn save_index(&self) -> Result<()> { + let index_path = self.path.join("index.json"); + let mut file = File::create(index_path)?; + let contents = serde_json::to_string(&self.index)?; + file.write_all(contents.as_bytes())?; + Ok(()) + } + + fn get_file_path(&self, file_index: usize) -> PathBuf { + self.path + .join(format!("data_{}.json", file_index)) + } + + /// Load mapping from file + fn load_fs_data(&self) -> Result> { + if !self.path.exists() { + return Err(ArklibError::Storage( + self.label.clone(), + "File does not exist".to_owned(), + )); + } + + for entry in fs::read_dir(self.path.clone())? { + let path = entry?.path(); + log::info!("Reading value from: {:?}", path); + + if !path.is_file() || path.file_name().unwrap() == "index.json" { + continue; + } + + let metadata = fs::metadata(&path)?; + let new_timestamp = metadata.modified()?; + + let file_index = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .split('_') + .nth(1) + .unwrap() + .parse::() + .unwrap(); + + let mut file = File::open(&path)?; + let mut contents: String = String::new(); + file.read_to_string(&mut contents)?; + + let data: BTreeMap = serde_json::from_str(&contents)?; + + // for (id, value) in data { + // let disk_timestamp = self + // .disk_timestamps + // .get(&id) + // .cloned() + // .unwrap_or(SystemTime::UNIX_EPOCH); + // if disk_timestamp < new_timestamp { + // self.data.entries.insert(id.clone(), value); + // self.disk_timestamps.insert(id, new_timestamp); + // self.index.insert(id, file_index); + // } + // } + + // self.disk_timestamps.extend(new_timestamps); + // self.save_index()?; + + // Ok(new_value_by_id) + } + + // let file = fs::File::open(&self.path)?; + // let data: FolderStorageData = serde_json::from_reader(file) + // .map_err(|err| { + // ArklibError::Storage(self.label.clone(), err.to_string()) + // })?; + // let version = data.version; + // if version != STORAGE_VERSION { + // return Err(ArklibError::Storage( + // self.label.clone(), + // format!( + // "Storage version mismatch: expected {}, got {}", + // STORAGE_VERSION, version + // ), + // )); + // } + + Ok(data) + } + + fn find_changed_ids(&self) -> Vec { + self.ram_timestamps + .iter() + .filter_map(|(id, ram_ft)| { + let disk_ft = self.disk_timestamps.get(id); + if disk_ft.is_none() || disk_ft.unwrap() != ram_ft { + Some(id.clone()) + } else { + None + } + }) + .collect() + } +} + +impl BaseStorage for FolderStorage +where + K: Ord + + Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::str::FromStr + + std::fmt::Debug, + V: Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::str::FromStr + + Monoid, +{ + fn set(&mut self, id: K, value: V) { + // Perform all this either in init or readFS + // let file_index = if let Some(&existing_index) = self.index.get(&id) { + // existing_index + // } else if self.current_file_entries >= MAX_ENTRIES_PER_FILE { + // self.current_file_index += 1; + // self.current_file_entries = 0; + // self.current_file_index + // } else { + // self.current_file_index + // }; + + // let file_path = self.get_file_path(file_index); + // let mut data: BTreeMap = if file_path.exists() { + // let mut file = File::open(&file_path).unwrap(); + // let mut contents = String::new(); + // file.read_to_string(&mut contents).unwrap(); + // serde_json::from_str(&contents).unwrap() + // } else { + // BTreeMap::new() + // }; + + self.data.entries.insert(id.clone(), value); + self.ram_timestamps.insert(id, SystemTime::now()); + + // data.insert(id.clone(), value); + // let mut file = File::create(file_path)?; + // let contents = serde_json::to_string(&data)?; + // file.write_all(contents.as_bytes())?; // remove this instead add to a self.data + + // self.index.insert(id.clone(), file_index); + // self.current_file_entries += 1; + // self.save_index()?; + + // let now = SystemTime::now() + // .duration_since(UNIX_EPOCH) + // .unwrap() + // .as_secs(); + // self.ram_timestamps.insert( + // id, + // SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(now), + // ); + } + + fn remove(&mut self, id: &K) -> Result<()> { + // if let Some(&file_index) = self.index.get(id) { + // let file_path = self.get_file_path(file_index); + // let mut file = File::open(&file_path)?; + // let mut contents = String::new(); + // file.read_to_string(&mut contents)?; + // let mut data: BTreeMap = + // serde_json::from_str(&contents)?; + + // data.remove(id); + + // let mut file = File::create(file_path)?; + // let contents = serde_json::to_string(&data)?; + // file.write_all(contents.as_bytes())?; + + // self.index.remove(id); + // self.save_index()?; + // self.ram_timestamps.remove(id); + // self.disk_timestamps.remove(id); + // } + self.data.entries.remove(id).ok_or_else(|| { + ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) + })?; + self.ram_timestamps.remove(id); + Ok(()) + } + + /// Compare the timestamp of the storage file + /// with the timestamp of the in-memory storage and the last written + /// to time to determine if either of the two requires syncing. + fn sync_status(&self, id: &K) -> Result { + let file_updated = fs::metadata(&self.path)?.modified()?; + // Determine the synchronization status based on the modification times + // Conditions: + // 1. If both the in-memory storage and the storage on disk have been modified + // since the last write, then the storage is diverged. + // 2. If only the in-memory storage has been modified since the last write, + // then the storage on disk is stale. + // 3. If only the storage on disk has been modified since the last write, + // then the in-memory storage is stale. + // 4. If neither the in-memory storage nor the storage on disk has been modified + // since the last write, then the storage is in sync. + let status = match ( + self.ram_timestamps.get(id) > self.disk_timestamps.get(id), + file_updated > self.disk_timestamps.get(id).unwrap().clone(), + ) { + (true, true) => SyncStatus::Diverge, + (true, false) => SyncStatus::StorageStale, + (false, true) => SyncStatus::MappingStale, + (false, false) => SyncStatus::InSync, + }; + + log::info!("{} sync status is {}", self.label, status); + Ok(status) + } + + /// Sync the in-memory storage with the storage on disk + fn sync(&mut self) -> Result<()> { + match self.sync_status()? { + SyncStatus::InSync => Ok(()), + SyncStatus::MappingStale => self.read_fs().map(|_| ()), + SyncStatus::StorageStale => self.write_fs().map(|_| ()), + SyncStatus::Diverge => { + let data = self.load_fs_data()?; + self.merge_from(&data)?; + self.write_fs()?; + Ok(()) + } + } + } + + fn read_fs(&mut self) -> Result<&BTreeMap> { + let data = self.load_fs_data()?; + self.data = data; + Ok(&self.data.entries) + } + + /// Get a value from the internal mapping + fn get(&self, id: &K) -> Option<&V> { + self.data.entries.get(id) + } + + /// Write the data to file + /// + /// Update the modified timestamp in file metadata to avoid OS timing issues + /// https://github.com/ARK-Builders/ark-rust/pull/63#issuecomment-2163882227 + fn write_fs(&mut self) -> Result<()> { + fs::create_dir_all(&self.path)?; + let changed_values: BTreeMap = BTreeMap::new(); // TODO + for (id, val) in changed_values.clone() { + let file_index = if let Some(&existing_index) = self.index.get(&id) + { + existing_index + } else if self.current_file_entries >= MAX_ENTRIES_PER_FILE { + self.current_file_index += 1; + self.current_file_entries = 0; + self.current_file_index + } else { + self.current_file_index + }; + + let file_path = self.get_file_path(file_index); + let mut data: BTreeMap = if file_path.exists() { + let mut file = File::open(&file_path).unwrap(); + let mut contents = String::new(); + file.read_to_string(&mut contents).unwrap(); + serde_json::from_str(&contents).unwrap() + } else { + BTreeMap::new() + }; + + data.insert(id.clone(), val); + + let mut file = File::create(file_path)?; + let contents = serde_json::to_string(&data)?; + file.write_all(contents.as_bytes())?; + + self.index.insert(id.clone(), file_index); + self.current_file_entries += 1; + self.save_index()?; + + let now = SystemTime::now(); + self.ram_timestamps.insert(id.clone(), now); + self.disk_timestamps.insert(id, now); + } + + log::info!( + "{} {} entries have been written", + self.label, + changed_values.len() + ); + + // let changed_value_by_ids: BTreeMap<_, _> = self + // .find_changed_ids() + // .into_iter() + // .filter_map(|id| value_by_id.get(&id).map(|v| (id, v.clone()))) + // .collect(); + + Ok(()) + } + + /// Erase the file from disk + /// Implement later + fn erase(&self) -> Result<()> { + unimplemented!("erase") + // fs::remove_file(&self.path).map_err(|err| { + // ArklibError::Storage(self.label.clone(), err.to_string()) + // }) + } + + /// Merge the data from another storage instance into this storage instance + fn merge_from(&mut self, _other: impl AsRef>) -> Result<()> + where + V: Monoid, + { + unimplemented!("merge_from") + // let other_entries = other.as_ref(); + // for (key, value) in other_entries { + // if let Some(existing_value) = self.data.entries.get(key) { + // let resolved_value = V::combine(existing_value, value); + // self.set(key.clone(), resolved_value); + // } else { + // self.set(key.clone(), value.clone()) + // } + // } + // self.modified = std::time::SystemTime::now(); + // Ok(()) + } +} + +impl AsRef> for FolderStorage +where + K: Ord, +{ + fn as_ref(&self) -> &BTreeMap { + &self.data.entries + } +} diff --git a/fs-storage/src/lib.rs b/fs-storage/src/lib.rs index bc1442b7..83e3868c 100644 --- a/fs-storage/src/lib.rs +++ b/fs-storage/src/lib.rs @@ -1,6 +1,7 @@ pub mod base_storage; pub mod btreemap_iter; pub mod file_storage; +pub mod folder_storage; #[cfg(feature = "jni-bindings")] pub mod jni; pub mod monoid; From 181428a653c7d5a34d8b8935567c74d9c8e847e6 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Sat, 20 Jul 2024 22:54:26 +0530 Subject: [PATCH 02/16] fix Signed-off-by: pushkarm029 --- fs-storage/src/folder_storage.rs | 419 +++++++++++++------------------ 1 file changed, 174 insertions(+), 245 deletions(-) diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 9e17351f..3518695d 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -1,8 +1,7 @@ use serde::{Deserialize, Serialize}; use std::fs::{self, File}; -use std::io::{Read, Write}; +use std::io::Write; use std::time::SystemTime; - use std::{ collections::BTreeMap, path::{Path, PathBuf}, @@ -10,12 +9,23 @@ use std::{ use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; -// use crate::utils::read_version_2_fs; +use crate::utils::read_version_2_fs; use data_error::{ArklibError, Result}; +/* +Note on `FolderStorage` Versioning: + +`FolderStorage` is a basic key-value storage system that persists data to disk. + +In version 2, `FolderStorage` stored data in a plaintext format. +Starting from version 3, data is stored in JSON format. + +For backward compatibility, we provide a helper function `read_version_2_fs` to read version 2 format. +*/ +const STORAGE_VERSION: i32 = 3; const MAX_ENTRIES_PER_FILE: usize = 1000; -const STORAGE_VERSION: i32 = 1; +/// Represents a file storage system that persists data to disk. pub struct FolderStorage where K: Ord, @@ -24,16 +34,12 @@ where label: String, /// Path to the underlying file where data is persisted path: PathBuf, - /// Tracks the last known modification time of each file in memory. - /// This becomes equal to `last_disk_updated` only when data is written or read from disk. - disk_timestamps: BTreeMap, - /// Tracks the last known modification time of each file on disk. - /// This becomes equal to `last_ram_updated` only when data is written or read from disk. - ram_timestamps: BTreeMap, - current_file_index: usize, - current_file_entries: usize, - // Maps keys to file indices - index: BTreeMap, + /// Last modified time of internal mapping. This becomes equal to + /// `written_to_disk` only when data is written or read from disk. + modified: SystemTime, + /// Last time the data was written to disk. This becomes equal to + /// `modified` only when data is written or read from disk. + written_to_disk: SystemTime, data: FolderStorageData, } @@ -65,29 +71,30 @@ where + Clone + serde::Serialize + serde::de::DeserializeOwned - + std::str::FromStr - + std::fmt::Debug, + + std::str::FromStr, V: Clone + serde::Serialize + serde::de::DeserializeOwned + std::str::FromStr + Monoid, { + /// Create a new file storage with a diagnostic label and file path + /// The storage will be initialized using the disk data, if the path exists + /// + /// Note: if the file storage already exists, the data will be read from the file + /// without overwriting it. pub fn new(label: String, path: &Path) -> Result { + let time = SystemTime::now(); let mut storage = Self { label, - path: path.to_path_buf(), - disk_timestamps: BTreeMap::new(), - ram_timestamps: BTreeMap::new(), - current_file_index: 0, - current_file_entries: 0, - index: BTreeMap::new(), + path: PathBuf::from(path), + modified: time, + written_to_disk: time, data: FolderStorageData { version: STORAGE_VERSION, entries: BTreeMap::new(), }, }; - storage.load_index()?; if Path::exists(path) { storage.read_fs()?; @@ -96,32 +103,6 @@ where Ok(storage) } - fn load_index(&mut self) -> Result<()> { - let index_path: PathBuf = self.path.join("index.json"); - if index_path.exists() { - let mut file = File::open(index_path)?; - let mut contents = String::new(); - file.read_to_string(&mut contents)?; - self.index = serde_json::from_str(&contents)?; - self.current_file_index = - *self.index.values().max().unwrap_or(&0) + 1; // Correct? - } - Ok(()) - } - - fn save_index(&self) -> Result<()> { - let index_path = self.path.join("index.json"); - let mut file = File::create(index_path)?; - let contents = serde_json::to_string(&self.index)?; - file.write_all(contents.as_bytes())?; - Ok(()) - } - - fn get_file_path(&self, file_index: usize) -> PathBuf { - self.path - .join(format!("data_{}.json", file_index)) - } - /// Load mapping from file fn load_fs_data(&self) -> Result> { if !self.path.exists() { @@ -131,85 +112,73 @@ where )); } - for entry in fs::read_dir(self.path.clone())? { - let path = entry?.path(); - log::info!("Reading value from: {:?}", path); + let mut data = FolderStorageData { + version: STORAGE_VERSION, + entries: BTreeMap::new(), + }; - if !path.is_file() || path.file_name().unwrap() == "index.json" { - continue; + let index_path = self.path.join("index.json"); + if index_path.exists() { + let index_file = File::open(&index_path)?; + let index: BTreeMap = + serde_json::from_reader(index_file)?; + + for (_key, file_index) in index { + let file_path = self + .path + .join(format!("data_{}.json", file_index)); + if file_path.exists() { + // First check if the file starts with "version: 2" + let file_content = + std::fs::read_to_string(file_path.clone())?; + if file_content.starts_with("version: 2") { + // Attempt to parse the file using the legacy version 2 storage format of FolderStorage. + match read_version_2_fs(&file_path) { + Ok(legacy_data) => { + log::info!( + "Version 2 storage format detected for {}", + self.label + ); + data.version = 2; + data.entries.extend(legacy_data); + continue; + } + Err(_) => { + return Err(ArklibError::Storage( + self.label.clone(), + "Storage seems to be version 2, but failed to parse" + .to_owned(), + )); + } + }; + } + + let file = fs::File::open(&file_path)?; + let file_data: FolderStorageData = + serde_json::from_reader(file).map_err(|err| { + ArklibError::Storage( + self.label.clone(), + err.to_string(), + ) + })?; + + if file_data.version != STORAGE_VERSION { + return Err(ArklibError::Storage( + self.label.clone(), + format!( + "Storage version mismatch: expected {}, got {}", + STORAGE_VERSION, file_data.version + ), + )); + } + + data.entries.extend(file_data.entries); + } } - - let metadata = fs::metadata(&path)?; - let new_timestamp = metadata.modified()?; - - let file_index = path - .file_stem() - .unwrap() - .to_str() - .unwrap() - .split('_') - .nth(1) - .unwrap() - .parse::() - .unwrap(); - - let mut file = File::open(&path)?; - let mut contents: String = String::new(); - file.read_to_string(&mut contents)?; - - let data: BTreeMap = serde_json::from_str(&contents)?; - - // for (id, value) in data { - // let disk_timestamp = self - // .disk_timestamps - // .get(&id) - // .cloned() - // .unwrap_or(SystemTime::UNIX_EPOCH); - // if disk_timestamp < new_timestamp { - // self.data.entries.insert(id.clone(), value); - // self.disk_timestamps.insert(id, new_timestamp); - // self.index.insert(id, file_index); - // } - // } - - // self.disk_timestamps.extend(new_timestamps); - // self.save_index()?; - - // Ok(new_value_by_id) } - // let file = fs::File::open(&self.path)?; - // let data: FolderStorageData = serde_json::from_reader(file) - // .map_err(|err| { - // ArklibError::Storage(self.label.clone(), err.to_string()) - // })?; - // let version = data.version; - // if version != STORAGE_VERSION { - // return Err(ArklibError::Storage( - // self.label.clone(), - // format!( - // "Storage version mismatch: expected {}, got {}", - // STORAGE_VERSION, version - // ), - // )); - // } - Ok(data) } - - fn find_changed_ids(&self) -> Vec { - self.ram_timestamps - .iter() - .filter_map(|(id, ram_ft)| { - let disk_ft = self.disk_timestamps.get(id); - if disk_ft.is_none() || disk_ft.unwrap() != ram_ft { - Some(id.clone()) - } else { - None - } - }) - .collect() - } } impl BaseStorage for FolderStorage @@ -218,90 +187,34 @@ where + Clone + serde::Serialize + serde::de::DeserializeOwned - + std::str::FromStr - + std::fmt::Debug, + + std::str::FromStr, V: Clone + serde::Serialize + serde::de::DeserializeOwned + std::str::FromStr + Monoid, { - fn set(&mut self, id: K, value: V) { - // Perform all this either in init or readFS - // let file_index = if let Some(&existing_index) = self.index.get(&id) { - // existing_index - // } else if self.current_file_entries >= MAX_ENTRIES_PER_FILE { - // self.current_file_index += 1; - // self.current_file_entries = 0; - // self.current_file_index - // } else { - // self.current_file_index - // }; - - // let file_path = self.get_file_path(file_index); - // let mut data: BTreeMap = if file_path.exists() { - // let mut file = File::open(&file_path).unwrap(); - // let mut contents = String::new(); - // file.read_to_string(&mut contents).unwrap(); - // serde_json::from_str(&contents).unwrap() - // } else { - // BTreeMap::new() - // }; - - self.data.entries.insert(id.clone(), value); - self.ram_timestamps.insert(id, SystemTime::now()); - - // data.insert(id.clone(), value); - // let mut file = File::create(file_path)?; - // let contents = serde_json::to_string(&data)?; - // file.write_all(contents.as_bytes())?; // remove this instead add to a self.data - - // self.index.insert(id.clone(), file_index); - // self.current_file_entries += 1; - // self.save_index()?; - - // let now = SystemTime::now() - // .duration_since(UNIX_EPOCH) - // .unwrap() - // .as_secs(); - // self.ram_timestamps.insert( - // id, - // SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(now), - // ); + /// Set a key-value pair in the internal mapping + fn set(&mut self, key: K, value: V) { + self.data.entries.insert(key, value); + self.modified = std::time::SystemTime::now(); } + /// Remove an entry from the internal mapping given a key fn remove(&mut self, id: &K) -> Result<()> { - // if let Some(&file_index) = self.index.get(id) { - // let file_path = self.get_file_path(file_index); - // let mut file = File::open(&file_path)?; - // let mut contents = String::new(); - // file.read_to_string(&mut contents)?; - // let mut data: BTreeMap = - // serde_json::from_str(&contents)?; - - // data.remove(id); - - // let mut file = File::create(file_path)?; - // let contents = serde_json::to_string(&data)?; - // file.write_all(contents.as_bytes())?; - - // self.index.remove(id); - // self.save_index()?; - // self.ram_timestamps.remove(id); - // self.disk_timestamps.remove(id); - // } self.data.entries.remove(id).ok_or_else(|| { ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) })?; - self.ram_timestamps.remove(id); + self.modified = std::time::SystemTime::now(); Ok(()) } /// Compare the timestamp of the storage file /// with the timestamp of the in-memory storage and the last written /// to time to determine if either of the two requires syncing. - fn sync_status(&self, id: &K) -> Result { + fn sync_status(&self) -> Result { let file_updated = fs::metadata(&self.path)?.modified()?; + // Determine the synchronization status based on the modification times // Conditions: // 1. If both the in-memory storage and the storage on disk have been modified @@ -313,8 +226,8 @@ where // 4. If neither the in-memory storage nor the storage on disk has been modified // since the last write, then the storage is in sync. let status = match ( - self.ram_timestamps.get(id) > self.disk_timestamps.get(id), - file_updated > self.disk_timestamps.get(id).unwrap().clone(), + self.modified > self.written_to_disk, + file_updated > self.written_to_disk, ) { (true, true) => SyncStatus::Diverge, (true, false) => SyncStatus::StorageStale, @@ -341,9 +254,13 @@ where } } + /// Read the data from file fn read_fs(&mut self) -> Result<&BTreeMap> { let data = self.load_fs_data()?; + self.modified = fs::metadata(&self.path)?.modified()?; + self.written_to_disk = self.modified; self.data = data; + Ok(&self.data.entries) } @@ -357,86 +274,98 @@ where /// Update the modified timestamp in file metadata to avoid OS timing issues /// https://github.com/ARK-Builders/ark-rust/pull/63#issuecomment-2163882227 fn write_fs(&mut self) -> Result<()> { - fs::create_dir_all(&self.path)?; - let changed_values: BTreeMap = BTreeMap::new(); // TODO - for (id, val) in changed_values.clone() { - let file_index = if let Some(&existing_index) = self.index.get(&id) - { - existing_index - } else if self.current_file_entries >= MAX_ENTRIES_PER_FILE { - self.current_file_index += 1; - self.current_file_entries = 0; - self.current_file_index - } else { - self.current_file_index - }; + let parent_dir = self.path.parent().ok_or_else(|| { + ArklibError::Storage( + self.label.clone(), + "Failed to get parent directory".to_owned(), + ) + })?; + fs::create_dir_all(parent_dir)?; + + let mut current_file_index = 0; + let mut current_file_entries = 0; - let file_path = self.get_file_path(file_index); - let mut data: BTreeMap = if file_path.exists() { - let mut file = File::open(&file_path).unwrap(); - let mut contents = String::new(); - file.read_to_string(&mut contents).unwrap(); - serde_json::from_str(&contents).unwrap() + for (key, value) in &self.data.entries { + if current_file_entries >= MAX_ENTRIES_PER_FILE { + current_file_index += 1; + current_file_entries = 0; + } + + let file_path = self + .path + .join(format!("data_{}.json", current_file_index)); + let mut file_data: BTreeMap = if file_path.exists() { + let file = File::open(&file_path)?; + serde_json::from_reader(file)? } else { BTreeMap::new() }; - data.insert(id.clone(), val); - - let mut file = File::create(file_path)?; - let contents = serde_json::to_string(&data)?; - file.write_all(contents.as_bytes())?; + file_data.insert(key.clone(), value.clone()); + current_file_entries += 1; - self.index.insert(id.clone(), file_index); - self.current_file_entries += 1; - self.save_index()?; + let mut file = File::create(&file_path)?; + file.write_all( + serde_json::to_string_pretty(&file_data)?.as_bytes(), + )?; + file.flush()?; - let now = SystemTime::now(); - self.ram_timestamps.insert(id.clone(), now); - self.disk_timestamps.insert(id, now); + let new_timestamp = SystemTime::now(); + file.set_modified(new_timestamp)?; + file.sync_all()?; } + // Write the index file + // index stores K -> key, V -> file index in which key value pair is stored + let index: BTreeMap = self + .data + .entries + .keys() + .enumerate() + .map(|(i, k)| (k.clone(), i / MAX_ENTRIES_PER_FILE)) + .collect(); + let index_path = self.path.join("index.json"); + let mut index_file = File::create(index_path)?; + index_file + .write_all(serde_json::to_string_pretty(&index)?.as_bytes())?; + index_file.flush()?; + index_file.sync_all()?; + + let new_timestamp = SystemTime::now(); + self.modified = new_timestamp; + self.written_to_disk = new_timestamp; + log::info!( "{} {} entries have been written", self.label, - changed_values.len() + self.data.entries.len() ); - - // let changed_value_by_ids: BTreeMap<_, _> = self - // .find_changed_ids() - // .into_iter() - // .filter_map(|id| value_by_id.get(&id).map(|v| (id, v.clone()))) - // .collect(); - Ok(()) } /// Erase the file from disk - /// Implement later fn erase(&self) -> Result<()> { - unimplemented!("erase") - // fs::remove_file(&self.path).map_err(|err| { - // ArklibError::Storage(self.label.clone(), err.to_string()) - // }) + fs::remove_dir(&self.path).map_err(|err| { + ArklibError::Storage(self.label.clone(), err.to_string()) + }) } /// Merge the data from another storage instance into this storage instance - fn merge_from(&mut self, _other: impl AsRef>) -> Result<()> + fn merge_from(&mut self, other: impl AsRef>) -> Result<()> where V: Monoid, { - unimplemented!("merge_from") - // let other_entries = other.as_ref(); - // for (key, value) in other_entries { - // if let Some(existing_value) = self.data.entries.get(key) { - // let resolved_value = V::combine(existing_value, value); - // self.set(key.clone(), resolved_value); - // } else { - // self.set(key.clone(), value.clone()) - // } - // } - // self.modified = std::time::SystemTime::now(); - // Ok(()) + let other_entries = other.as_ref(); + for (key, value) in other_entries { + if let Some(existing_value) = self.data.entries.get(key) { + let resolved_value = V::combine(existing_value, value); + self.set(key.clone(), resolved_value); + } else { + self.set(key.clone(), value.clone()) + } + } + self.modified = std::time::SystemTime::now(); + Ok(()) } } From afa0b37b69ea76432c49f068dcaaabfebb49f09d Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Tue, 23 Jul 2024 22:10:10 +0530 Subject: [PATCH 03/16] fix Signed-off-by: pushkarm029 --- fs-storage/Cargo.toml | 2 +- fs-storage/src/folder_storage.rs | 251 +++++++++++++------------------ 2 files changed, 105 insertions(+), 148 deletions(-) diff --git a/fs-storage/Cargo.toml b/fs-storage/Cargo.toml index c6ffdd18..cf64ff58 100644 --- a/fs-storage/Cargo.toml +++ b/fs-storage/Cargo.toml @@ -17,7 +17,7 @@ serde_json = "1.0.82" serde = { version = "1.0.138", features = ["derive"] } jni = { version = "0.21.1", optional = true } jnix = { version = "0.5.1", features = ["derive"], optional = true } - +bincode = "1.3" data-error = { path = "../data-error" } diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 3518695d..aa960da4 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use std::fs::{self, File}; -use std::io::Write; +use std::io::{Read, Write}; use std::time::SystemTime; use std::{ collections::BTreeMap, @@ -9,7 +9,7 @@ use std::{ use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; -use crate::utils::read_version_2_fs; +// use crate::utils::read_version_2_fs; use data_error::{ArklibError, Result}; /* @@ -23,7 +23,6 @@ Starting from version 3, data is stored in JSON format. For backward compatibility, we provide a helper function `read_version_2_fs` to read version 2 format. */ const STORAGE_VERSION: i32 = 3; -const MAX_ENTRIES_PER_FILE: usize = 1000; /// Represents a file storage system that persists data to disk. pub struct FolderStorage @@ -36,10 +35,12 @@ where path: PathBuf, /// Last modified time of internal mapping. This becomes equal to /// `written_to_disk` only when data is written or read from disk. - modified: SystemTime, + // modified: SystemTime, + ram_timestamps: BTreeMap, /// Last time the data was written to disk. This becomes equal to /// `modified` only when data is written or read from disk. - written_to_disk: SystemTime, + // written_to_disk: SystemTime, + disk_timestamps: BTreeMap, data: FolderStorageData, } @@ -71,7 +72,8 @@ where + Clone + serde::Serialize + serde::de::DeserializeOwned - + std::str::FromStr, + + std::str::FromStr + + std::fmt::Display, V: Clone + serde::Serialize + serde::de::DeserializeOwned @@ -84,12 +86,11 @@ where /// Note: if the file storage already exists, the data will be read from the file /// without overwriting it. pub fn new(label: String, path: &Path) -> Result { - let time = SystemTime::now(); let mut storage = Self { label, path: PathBuf::from(path), - modified: time, - written_to_disk: time, + ram_timestamps: BTreeMap::new(), + disk_timestamps: BTreeMap::new(), data: FolderStorageData { version: STORAGE_VERSION, entries: BTreeMap::new(), @@ -112,71 +113,52 @@ where )); } + if !self.path.is_dir() { + return Err(ArklibError::Storage( + self.label.clone(), + "Path is not a directory".to_owned(), + )); + } + let mut data = FolderStorageData { version: STORAGE_VERSION, entries: BTreeMap::new(), }; - let index_path = self.path.join("index.json"); - if index_path.exists() { - let index_file = File::open(&index_path)?; - let index: BTreeMap = - serde_json::from_reader(index_file)?; - - for (_key, file_index) in index { - let file_path = self - .path - .join(format!("data_{}.json", file_index)); - if file_path.exists() { - // First check if the file starts with "version: 2" - let file_content = - std::fs::read_to_string(file_path.clone())?; - if file_content.starts_with("version: 2") { - // Attempt to parse the file using the legacy version 2 storage format of FolderStorage. - match read_version_2_fs(&file_path) { - Ok(legacy_data) => { - log::info!( - "Version 2 storage format detected for {}", - self.label - ); - data.version = 2; - data.entries.extend(legacy_data); - continue; - } - Err(_) => { - return Err(ArklibError::Storage( - self.label.clone(), - "Storage seems to be version 2, but failed to parse" - .to_owned(), - )); - } - }; - } - - let file = fs::File::open(&file_path)?; - let file_data: FolderStorageData = - serde_json::from_reader(file).map_err(|err| { - ArklibError::Storage( - self.label.clone(), - err.to_string(), - ) - })?; - - if file_data.version != STORAGE_VERSION { - return Err(ArklibError::Storage( + // read_version_2_fs : unimplemented!() + + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path.extension().map_or(false, |ext| ext == "bin") + { + let key = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .parse::() + .map_err(|_| { + ArklibError::Storage( self.label.clone(), - format!( - "Storage version mismatch: expected {}, got {}", - STORAGE_VERSION, file_data.version - ), - )); - } - - data.entries.extend(file_data.entries); - } + "Failed to parse key from filename".to_owned(), + ) + })?; + + let mut file = File::open(&path)?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + + let value: V = bincode::deserialize(&buffer).map_err(|e| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to deserialize value: {}", e), + ) + })?; + data.entries.insert(key, value); } } - Ok(data) } } @@ -187,7 +169,8 @@ where + Clone + serde::Serialize + serde::de::DeserializeOwned - + std::str::FromStr, + + std::str::FromStr + + std::fmt::Display, V: Clone + serde::Serialize + serde::de::DeserializeOwned @@ -196,8 +179,8 @@ where { /// Set a key-value pair in the internal mapping fn set(&mut self, key: K, value: V) { - self.data.entries.insert(key, value); - self.modified = std::time::SystemTime::now(); + self.data.entries.insert(key.clone(), value); + self.ram_timestamps.insert(key, SystemTime::now()); } /// Remove an entry from the internal mapping given a key @@ -205,7 +188,10 @@ where self.data.entries.remove(id).ok_or_else(|| { ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) })?; - self.modified = std::time::SystemTime::now(); + // self.ram_timestamps.remove(id); + // OR + self.ram_timestamps + .insert(id.clone(), SystemTime::now()); Ok(()) } @@ -213,30 +199,7 @@ where /// with the timestamp of the in-memory storage and the last written /// to time to determine if either of the two requires syncing. fn sync_status(&self) -> Result { - let file_updated = fs::metadata(&self.path)?.modified()?; - - // Determine the synchronization status based on the modification times - // Conditions: - // 1. If both the in-memory storage and the storage on disk have been modified - // since the last write, then the storage is diverged. - // 2. If only the in-memory storage has been modified since the last write, - // then the storage on disk is stale. - // 3. If only the storage on disk has been modified since the last write, - // then the in-memory storage is stale. - // 4. If neither the in-memory storage nor the storage on disk has been modified - // since the last write, then the storage is in sync. - let status = match ( - self.modified > self.written_to_disk, - file_updated > self.written_to_disk, - ) { - (true, true) => SyncStatus::Diverge, - (true, false) => SyncStatus::StorageStale, - (false, true) => SyncStatus::MappingStale, - (false, false) => SyncStatus::InSync, - }; - - log::info!("{} sync status is {}", self.label, status); - Ok(status) + unimplemented!() } /// Sync the in-memory storage with the storage on disk @@ -257,10 +220,16 @@ where /// Read the data from file fn read_fs(&mut self) -> Result<&BTreeMap> { let data = self.load_fs_data()?; - self.modified = fs::metadata(&self.path)?.modified()?; - self.written_to_disk = self.modified; self.data = data; - + self.disk_timestamps.clear(); + for key in self.data.entries.keys() { + let file_path = self.path.join(format!("{}.bin", key)); + if let Ok(metadata) = fs::metadata(&file_path) { + if let Ok(modified) = metadata.modified() { + self.disk_timestamps.insert(key.clone(), modified); + } + } + } Ok(&self.data.entries) } @@ -274,66 +243,53 @@ where /// Update the modified timestamp in file metadata to avoid OS timing issues /// https://github.com/ARK-Builders/ark-rust/pull/63#issuecomment-2163882227 fn write_fs(&mut self) -> Result<()> { - let parent_dir = self.path.parent().ok_or_else(|| { - ArklibError::Storage( - self.label.clone(), - "Failed to get parent directory".to_owned(), - ) - })?; - fs::create_dir_all(parent_dir)?; - - let mut current_file_index = 0; - let mut current_file_entries = 0; + fs::create_dir_all(&self.path)?; for (key, value) in &self.data.entries { - if current_file_entries >= MAX_ENTRIES_PER_FILE { - current_file_index += 1; - current_file_entries = 0; - } - - let file_path = self - .path - .join(format!("data_{}.json", current_file_index)); - let mut file_data: BTreeMap = if file_path.exists() { - let file = File::open(&file_path)?; - serde_json::from_reader(file)? - } else { - BTreeMap::new() - }; - - file_data.insert(key.clone(), value.clone()); - current_file_entries += 1; + let file_path = self.path.join(format!("{}.bin", key)); + let encoded: Vec = bincode::serialize(value).map_err(|e| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to serialize value: {}", e), + ) + })?; let mut file = File::create(&file_path)?; - file.write_all( - serde_json::to_string_pretty(&file_data)?.as_bytes(), - )?; + file.write_all(&encoded)?; file.flush()?; let new_timestamp = SystemTime::now(); file.set_modified(new_timestamp)?; file.sync_all()?; + + self.disk_timestamps + .insert(key.clone(), new_timestamp); } - // Write the index file - // index stores K -> key, V -> file index in which key value pair is stored - let index: BTreeMap = self - .data - .entries - .keys() - .enumerate() - .map(|(i, k)| (k.clone(), i / MAX_ENTRIES_PER_FILE)) - .collect(); - let index_path = self.path.join("index.json"); - let mut index_file = File::create(index_path)?; - index_file - .write_all(serde_json::to_string_pretty(&index)?.as_bytes())?; - index_file.flush()?; - index_file.sync_all()?; - - let new_timestamp = SystemTime::now(); - self.modified = new_timestamp; - self.written_to_disk = new_timestamp; + // Remove files for keys that no longer exist + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path.extension().map_or(false, |ext| ext == "bin") + { + let key = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .parse::() + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to parse key from filename".to_owned(), + ) + })?; + if !self.data.entries.contains_key(&key) { + fs::remove_file(path)?; + } + } + } log::info!( "{} {} entries have been written", @@ -363,8 +319,9 @@ where } else { self.set(key.clone(), value.clone()) } + self.ram_timestamps + .insert(key.clone(), SystemTime::now()); } - self.modified = std::time::SystemTime::now(); Ok(()) } } From 7600ab48d632dc5310ab4382397c6cdbc8df1c37 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Wed, 24 Jul 2024 23:09:00 +0530 Subject: [PATCH 04/16] fix Signed-off-by: pushkarm029 --- fs-storage/src/folder_storage.rs | 144 ++++++++++++++++++++----------- fs-storage/src/utils.rs | 34 ++++++++ 2 files changed, 127 insertions(+), 51 deletions(-) diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index aa960da4..fcb2e200 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -10,12 +10,15 @@ use std::{ use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; // use crate::utils::read_version_2_fs; +use crate::utils::remove_files_not_in_ram; use data_error::{ArklibError, Result}; /* Note on `FolderStorage` Versioning: `FolderStorage` is a basic key-value storage system that persists data to disk. +where the key is the path of the file inside the directory. + In version 2, `FolderStorage` stored data in a plaintext format. Starting from version 3, data is stored in JSON format. @@ -24,7 +27,7 @@ For backward compatibility, we provide a helper function `read_version_2_fs` to */ const STORAGE_VERSION: i32 = 3; -/// Represents a file storage system that persists data to disk. +/// Represents a folder storage system that persists data to disk. pub struct FolderStorage where K: Ord, @@ -33,13 +36,11 @@ where label: String, /// Path to the underlying file where data is persisted path: PathBuf, - /// Last modified time of internal mapping. This becomes equal to - /// `written_to_disk` only when data is written or read from disk. - // modified: SystemTime, + /// `ram_timestamps` can be used to track the last time a file was modified in memory. + /// where the key is the path of the file inside the directory. ram_timestamps: BTreeMap, - /// Last time the data was written to disk. This becomes equal to - /// `modified` only when data is written or read from disk. - // written_to_disk: SystemTime, + /// `disk_timestamps` can be used to track the last time a file written or read from disk. + /// where the key is the path of the file inside the directory. disk_timestamps: BTreeMap, data: FolderStorageData, } @@ -80,10 +81,10 @@ where + std::str::FromStr + Monoid, { - /// Create a new file storage with a diagnostic label and file path + /// Create a new folder storage with a diagnostic label and directory path /// The storage will be initialized using the disk data, if the path exists /// - /// Note: if the file storage already exists, the data will be read from the file + /// Note: if the folder storage already exists, the data will be read from the folder /// without overwriting it. pub fn new(label: String, path: &Path) -> Result { let mut storage = Self { @@ -104,8 +105,8 @@ where Ok(storage) } - /// Load mapping from file - fn load_fs_data(&self) -> Result> { + /// Load mapping from folder storage + fn load_fs_data(&mut self) -> Result> { if !self.path.exists() { return Err(ArklibError::Storage( self.label.clone(), @@ -125,6 +126,9 @@ where entries: BTreeMap::new(), }; + self.disk_timestamps.clear(); + self.ram_timestamps.clear(); + // read_version_2_fs : unimplemented!() for entry in fs::read_dir(&self.path)? { @@ -156,7 +160,14 @@ where format!("Failed to deserialize value: {}", e), ) })?; - data.entries.insert(key, value); + data.entries.insert(key.clone(), value); + + if let Ok(metadata) = fs::metadata(&path) { + if let Ok(modified) = metadata.modified() { + self.disk_timestamps.insert(key.clone(), modified); + self.ram_timestamps.insert(key, modified); + } + } } } Ok(data) @@ -195,11 +206,73 @@ where Ok(()) } - /// Compare the timestamp of the storage file - /// with the timestamp of the in-memory storage and the last written + /// Compare the timestamp of the storage files + /// with the timestamps of the in-memory storage and the last written /// to time to determine if either of the two requires syncing. fn sync_status(&self) -> Result { - unimplemented!() + let mut ram_newer = false; + let mut disk_newer = false; + + for (key, ram_timestamp) in &self.ram_timestamps { + let file_path = self.path.join(format!("{}.bin", key)); + + if let Ok(metadata) = fs::metadata(&file_path) { + if let Ok(disk_timestamp) = metadata.modified() { + match ram_timestamp.cmp(&disk_timestamp) { + std::cmp::Ordering::Greater => ram_newer = true, + std::cmp::Ordering::Less => disk_newer = true, + std::cmp::Ordering::Equal => {} + } + } else { + // If we can't read the disk timestamp, assume RAM is newer + ram_newer = true; + } + } else { + // If the file doesn't exist on disk, RAM is newer + ram_newer = true; + } + + // If we've found both RAM and disk modifications, we can stop checking + if ram_newer && disk_newer { + break; + } + } + + // Check for files on disk that aren't in RAM + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path.extension().map_or(false, |ext| ext == "bin") + { + let key = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .parse::() + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to parse key from filename".to_owned(), + ) + })?; + if !self.ram_timestamps.contains_key(&key) { + disk_newer = true; + break; + } + } + } + + let status = match (ram_newer, disk_newer) { + (false, false) => SyncStatus::InSync, + (true, false) => SyncStatus::StorageStale, + (false, true) => SyncStatus::MappingStale, + (true, true) => SyncStatus::Diverge, + }; + + log::info!("{} sync status is {}", self.label, status); + Ok(status) } /// Sync the in-memory storage with the storage on disk @@ -217,19 +290,10 @@ where } } - /// Read the data from file + /// Read the data from folder storage fn read_fs(&mut self) -> Result<&BTreeMap> { let data = self.load_fs_data()?; self.data = data; - self.disk_timestamps.clear(); - for key in self.data.entries.keys() { - let file_path = self.path.join(format!("{}.bin", key)); - if let Ok(metadata) = fs::metadata(&file_path) { - if let Ok(modified) = metadata.modified() { - self.disk_timestamps.insert(key.clone(), modified); - } - } - } Ok(&self.data.entries) } @@ -238,7 +302,7 @@ where self.data.entries.get(id) } - /// Write the data to file + /// Write the data to folder /// /// Update the modified timestamp in file metadata to avoid OS timing issues /// https://github.com/ARK-Builders/ark-rust/pull/63#issuecomment-2163882227 @@ -267,29 +331,7 @@ where } // Remove files for keys that no longer exist - for entry in fs::read_dir(&self.path)? { - let entry = entry?; - let path = entry.path(); - if path.is_file() - && path.extension().map_or(false, |ext| ext == "bin") - { - let key = path - .file_stem() - .unwrap() - .to_str() - .unwrap() - .parse::() - .map_err(|_| { - ArklibError::Storage( - self.label.clone(), - "Failed to parse key from filename".to_owned(), - ) - })?; - if !self.data.entries.contains_key(&key) { - fs::remove_file(path)?; - } - } - } + remove_files_not_in_ram(&self.path, &self.label, &self.data.entries); log::info!( "{} {} entries have been written", @@ -299,14 +341,14 @@ where Ok(()) } - /// Erase the file from disk + /// Erase the folder from disk fn erase(&self) -> Result<()> { fs::remove_dir(&self.path).map_err(|err| { ArklibError::Storage(self.label.clone(), err.to_string()) }) } - /// Merge the data from another storage instance into this storage instance + /// Merge the data from another folder storage instance into this folder storage instance fn merge_from(&mut self, other: impl AsRef>) -> Result<()> where V: Monoid, diff --git a/fs-storage/src/utils.rs b/fs-storage/src/utils.rs index b5b6830a..cce9e600 100644 --- a/fs-storage/src/utils.rs +++ b/fs-storage/src/utils.rs @@ -1,5 +1,7 @@ +use data_error::ArklibError; use data_error::Result; use std::collections::BTreeMap; +use std::fs; use std::path::Path; /// Parses version 2 `FileStorage` format and returns the data as a BTreeMap @@ -52,6 +54,38 @@ where Ok(data) } +pub fn remove_files_not_in_ram( + path: &Path, + label: &str, + data: &BTreeMap, +) where + K: std::str::FromStr + std::cmp::Ord, +{ + for entry in fs::read_dir(path).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if path.is_file() && path.extension().map_or(false, |ext| ext == "bin") + { + let key = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .parse::() + .map_err(|_| { + ArklibError::Storage( + label.to_owned(), + "Failed to parse key from filename".to_owned(), + ) + }) + .unwrap(); + if !data.contains_key(&key) { + fs::remove_file(path).unwrap(); + } + } + } +} + #[cfg(test)] mod tests { use super::*; From 5ebb50cd4b287970f67f24fd77355584f1a28d1e Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Thu, 25 Jul 2024 23:51:38 +0530 Subject: [PATCH 05/16] added tests Signed-off-by: pushkarm029 --- fs-storage/src/folder_storage.rs | 251 +++++++++++++++++++++++++++---- fs-storage/src/utils.rs | 34 ----- 2 files changed, 223 insertions(+), 62 deletions(-) diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index fcb2e200..8de9a6c7 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -10,7 +10,6 @@ use std::{ use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; // use crate::utils::read_version_2_fs; -use crate::utils::remove_files_not_in_ram; use data_error::{ArklibError, Result}; /* @@ -172,6 +171,33 @@ where } Ok(data) } + + fn remove_files_not_in_ram(&mut self) { + for entry in fs::read_dir(self.path.clone()).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if path.is_file() + && path.extension().map_or(false, |ext| ext == "bin") + { + let key = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .parse::() + .map_err(|_| { + ArklibError::Storage( + self.label.to_owned(), + "Failed to parse key from filename".to_owned(), + ) + }) + .unwrap(); + if !self.data.entries.contains_key(&key) { + fs::remove_file(path).unwrap(); + } + } + } + } } impl BaseStorage for FolderStorage @@ -219,47 +245,79 @@ where if let Ok(metadata) = fs::metadata(&file_path) { if let Ok(disk_timestamp) = metadata.modified() { match ram_timestamp.cmp(&disk_timestamp) { - std::cmp::Ordering::Greater => ram_newer = true, - std::cmp::Ordering::Less => disk_newer = true, + std::cmp::Ordering::Greater => { + ram_newer = true; + log::debug!( + "RAM newer: file {} is newer in RAM", + file_path.display() + ); + } + std::cmp::Ordering::Less => { + disk_newer = true; + log::debug!( + "Disk newer: file {} is newer on disk, ram: {}, disk: {}", + file_path.display(), + ram_timestamp.elapsed().unwrap().as_secs(), + disk_timestamp.elapsed().unwrap().as_secs() + ); + } std::cmp::Ordering::Equal => {} } } else { // If we can't read the disk timestamp, assume RAM is newer ram_newer = true; + log::debug!( + "RAM newer: couldn't read disk timestamp for {}", + file_path.display() + ); } } else { // If the file doesn't exist on disk, RAM is newer ram_newer = true; + log::debug!( + "RAM newer: file {} doesn't exist on disk", + file_path.display() + ); } // If we've found both RAM and disk modifications, we can stop checking if ram_newer && disk_newer { + log::debug!( + "Both RAM and disk modifications found, stopping check" + ); break; } } - // Check for files on disk that aren't in RAM - for entry in fs::read_dir(&self.path)? { - let entry = entry?; - let path = entry.path(); - if path.is_file() - && path.extension().map_or(false, |ext| ext == "bin") - { - let key = path - .file_stem() - .unwrap() - .to_str() - .unwrap() - .parse::() - .map_err(|_| { - ArklibError::Storage( - self.label.clone(), - "Failed to parse key from filename".to_owned(), - ) - })?; - if !self.ram_timestamps.contains_key(&key) { - disk_newer = true; - break; + // Skip this check if this divergent condition has already been reached + if !ram_newer || !disk_newer { + // Check for files on disk that aren't in RAM + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path.extension().map_or(false, |ext| ext == "bin") + { + let key = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .parse::() + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to parse key from filename".to_owned(), + ) + })?; + if !self.ram_timestamps.contains_key(&key) { + disk_newer = true; + log::debug!( + "Disk newer: file {} exists on disk but not in RAM", + path.display() + ); + break; + } } } } @@ -274,7 +332,6 @@ where log::info!("{} sync status is {}", self.label, status); Ok(status) } - /// Sync the in-memory storage with the storage on disk fn sync(&mut self) -> Result<()> { match self.sync_status()? { @@ -328,10 +385,12 @@ where self.disk_timestamps .insert(key.clone(), new_timestamp); + self.ram_timestamps + .insert(key.clone(), new_timestamp); } // Remove files for keys that no longer exist - remove_files_not_in_ram(&self.path, &self.label, &self.data.entries); + self.remove_files_not_in_ram(); log::info!( "{} {} entries have been written", @@ -343,7 +402,7 @@ where /// Erase the folder from disk fn erase(&self) -> Result<()> { - fs::remove_dir(&self.path).map_err(|err| { + fs::remove_dir_all(&self.path).map_err(|err| { ArklibError::Storage(self.label.clone(), err.to_string()) }) } @@ -376,3 +435,139 @@ where &self.data.entries } } + +#[cfg(test)] +mod tests { + use crate::{ + base_storage::{BaseStorage, SyncStatus}, + folder_storage::FolderStorage, + }; + use std::{fs, thread, time::Duration}; + + use tempdir::TempDir; + + #[test] + fn test_folder_storage_write_read() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + + storage.set("key1".to_owned(), "value1".to_string()); + storage.set("key2".to_owned(), "value2".to_string()); + + assert!(storage.remove(&"key1".to_string()).is_ok()); + storage + .write_fs() + .expect("Failed to write data to disk"); + + let data_read = storage + .read_fs() + .expect("Failed to read data from disk"); + assert_eq!(data_read.len(), 1); + assert_eq!(data_read.get("key2"), Some(&"value2".to_string())); + } + + #[test] + fn test_folder_storage_auto_delete() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + + storage.set("key1".to_string(), "value1".to_string()); + storage.set("key1".to_string(), "value2".to_string()); + assert!(storage.write_fs().is_ok()); + assert_eq!(temp_dir.path().exists(), true); + + if let Err(err) = storage.erase() { + panic!("Failed to delete file: {:?}", err); + } + assert!(!temp_dir.path().exists()); + } + + #[test] + fn test_folder_metadata_timestamp_updated() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + storage.write_fs().unwrap(); + + storage.set("key1".to_string(), "value1".to_string()); + let before_write = fs::metadata(&temp_dir.path()) + .unwrap() + .modified() + .unwrap(); + thread::sleep(Duration::from_millis(10)); + storage.write_fs().unwrap(); + let after_write = fs::metadata(&temp_dir.path()) + .unwrap() + .modified() + .unwrap(); + println!( + "before_write: {:?}, after_write: {:?}", + before_write, after_write + ); + assert!(before_write < after_write); + } + + #[test] + fn test_folder_storage_is_storage_updated() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + storage.write_fs().unwrap(); + assert_eq!(storage.sync_status().unwrap(), SyncStatus::InSync); + + storage.set("key1".to_string(), "value1".to_string()); + assert_eq!(storage.sync_status().unwrap(), SyncStatus::StorageStale); + storage.write_fs().unwrap(); + assert_eq!(storage.sync_status().unwrap(), SyncStatus::InSync); + + // External data manipulation + let mut mirror_storage = FolderStorage::new( + "MirrorTestStorage".to_string(), + temp_dir.path(), + ) + .unwrap(); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); + + mirror_storage.set("key1".to_string(), "value3".to_string()); + assert_eq!( + mirror_storage.sync_status().unwrap(), + SyncStatus::StorageStale + ); + + mirror_storage.write_fs().unwrap(); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); + + // receive updates from external data manipulation + assert_eq!(storage.sync_status().unwrap(), SyncStatus::MappingStale); + storage.read_fs().unwrap(); + assert_eq!(storage.sync_status().unwrap(), SyncStatus::InSync); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); + } + + #[test] + fn test_monoid_combine() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage1 = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + let mut storage2 = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + + storage1.set("key1".to_string(), 2); + storage1.set("key2".to_string(), 6); + + storage2.set("key1".to_string(), 3); + storage2.set("key3".to_string(), 9); + + storage1.merge_from(&storage2).unwrap(); + assert_eq!(storage1.as_ref().get("key1"), Some(&3)); + assert_eq!(storage1.as_ref().get("key2"), Some(&6)); + assert_eq!(storage1.as_ref().get("key3"), Some(&9)); + } +} diff --git a/fs-storage/src/utils.rs b/fs-storage/src/utils.rs index cce9e600..b5b6830a 100644 --- a/fs-storage/src/utils.rs +++ b/fs-storage/src/utils.rs @@ -1,7 +1,5 @@ -use data_error::ArklibError; use data_error::Result; use std::collections::BTreeMap; -use std::fs; use std::path::Path; /// Parses version 2 `FileStorage` format and returns the data as a BTreeMap @@ -54,38 +52,6 @@ where Ok(data) } -pub fn remove_files_not_in_ram( - path: &Path, - label: &str, - data: &BTreeMap, -) where - K: std::str::FromStr + std::cmp::Ord, -{ - for entry in fs::read_dir(path).unwrap() { - let entry = entry.unwrap(); - let path = entry.path(); - if path.is_file() && path.extension().map_or(false, |ext| ext == "bin") - { - let key = path - .file_stem() - .unwrap() - .to_str() - .unwrap() - .parse::() - .map_err(|_| { - ArklibError::Storage( - label.to_owned(), - "Failed to parse key from filename".to_owned(), - ) - }) - .unwrap(); - if !data.contains_key(&key) { - fs::remove_file(path).unwrap(); - } - } - } -} - #[cfg(test)] mod tests { use super::*; From 7af00942f1b5419815477c874bfd2c697d31fad1 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Mon, 29 Jul 2024 20:46:48 +0530 Subject: [PATCH 06/16] fix Signed-off-by: pushkarm029 --- fs-atomic-versions/src/atomic/file.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs-atomic-versions/src/atomic/file.rs b/fs-atomic-versions/src/atomic/file.rs index bd3f2571..65dde94c 100644 --- a/fs-atomic-versions/src/atomic/file.rs +++ b/fs-atomic-versions/src/atomic/file.rs @@ -1,6 +1,6 @@ use std::fs::{self, File}; use std::io::{Error, ErrorKind, Read, Result}; -#[cfg(target_os = "unix")] +#[cfg(target_family = "unix")] use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; @@ -229,7 +229,7 @@ impl AtomicFile { // May return `EEXIST`. let res = std::fs::hard_link(&new.path, new_path); if let Err(err) = res { - #[cfg(target_os = "unix")] + #[cfg(target_family = "unix")] // From open(2) manual page: // // "[...] create a unique file on the same filesystem (e.g., @@ -241,7 +241,7 @@ impl AtomicFile { if new.path.metadata()?.nlink() != 2 { Err(err)?; } - #[cfg(not(target_os = "unix"))] + #[cfg(not(target_family = "unix"))] Err(err)?; } From 3c82844bdb57be8713fc99acb9dc0257d11ef630 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Mon, 29 Jul 2024 20:55:00 +0530 Subject: [PATCH 07/16] fix Signed-off-by: pushkarm029 --- fs-storage/src/jni/file_storage.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fs-storage/src/jni/file_storage.rs b/fs-storage/src/jni/file_storage.rs index 20820cd0..c1080af6 100644 --- a/fs-storage/src/jni/file_storage.rs +++ b/fs-storage/src/jni/file_storage.rs @@ -45,7 +45,7 @@ pub extern "system" fn Java_dev_arkbuilders_core_FileStorage_create<'local>( let file_storage: FileStorage = FileStorage::new(label, Path::new(&path)).unwrap_or_else(|err| { - env.throw_new("java/lang/RuntimeException", &err.to_string()) + env.throw_new("java/lang/RuntimeException", err.to_string()) .expect("Failed to throw RuntimeException"); FileStorage::new("".to_string(), Path::new("")).unwrap() }); @@ -77,7 +77,7 @@ pub extern "system" fn Java_dev_arkbuilders_core_FileStorage_remove<'local>( FileStorage::from_jlong(file_storage_ptr) .remove(&id) .unwrap_or_else(|err| { - env.throw_new("java/lang/RuntimeException", &err.to_string()) + env.throw_new("java/lang/RuntimeException", err.to_string()) .unwrap(); }); } @@ -114,7 +114,7 @@ pub extern "system" fn Java_dev_arkbuilders_core_FileStorage_sync( FileStorage::from_jlong(file_storage_ptr) .sync() .unwrap_or_else(|err| { - env.throw_new("java/lang/RuntimeException", &err.to_string()) + env.throw_new("java/lang/RuntimeException", err.to_string()) .unwrap(); }); } @@ -129,7 +129,7 @@ pub extern "system" fn Java_dev_arkbuilders_core_FileStorage_readFS( match FileStorage::from_jlong(file_storage_ptr).read_fs() { Ok(data) => data.clone(), Err(err) => { - env.throw_new("java/lang/RuntimeException", &err.to_string()) + env.throw_new("java/lang/RuntimeException", err.to_string()) .expect("Failed to throw RuntimeException"); return JObject::null().into_raw(); } @@ -202,7 +202,7 @@ pub extern "system" fn Java_dev_arkbuilders_core_FileStorage_writeFS( FileStorage::from_jlong(file_storage_ptr) .write_fs() .unwrap_or_else(|err| { - env.throw_new("java/lang/RuntimeException", &err.to_string()) + env.throw_new("java/lang/RuntimeException", err.to_string()) .unwrap(); }); } @@ -219,7 +219,7 @@ pub extern "system" fn Java_dev_arkbuilders_core_FileStorage_erase( Box::from_raw(file_storage_ptr as *mut FileStorage) }; file_storage.erase().unwrap_or_else(|err| { - env.throw_new("java/lang/RuntimeException", &err.to_string()) + env.throw_new("java/lang/RuntimeException", err.to_string()) .unwrap(); }); } @@ -234,7 +234,7 @@ pub extern "system" fn Java_dev_arkbuilders_core_FileStorage_merge( FileStorage::from_jlong(file_storage_ptr) .merge_from(FileStorage::from_jlong(other_file_storage_ptr)) .unwrap_or_else(|err| { - env.throw_new("java/lang/RuntimeException", &err.to_string()) + env.throw_new("java/lang/RuntimeException", err.to_string()) .unwrap(); }); } From 46f53f931bce349c30fbf73cf463453103463b5d Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Fri, 9 Aug 2024 21:49:53 +0530 Subject: [PATCH 08/16] few fixes Signed-off-by: pushkarm029 --- fs-storage/examples/cli.rs | 124 +++++++++++++++++++++++++++---- fs-storage/src/folder_storage.rs | 93 ++++++++++++++--------- 2 files changed, 170 insertions(+), 47 deletions(-) diff --git a/fs-storage/examples/cli.rs b/fs-storage/examples/cli.rs index 7b2ab23d..a731811a 100644 --- a/fs-storage/examples/cli.rs +++ b/fs-storage/examples/cli.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use fs_storage::base_storage::BaseStorage; use fs_storage::file_storage::FileStorage; +use fs_storage::folder_storage::FolderStorage; use serde_json::Value; use std::env; use std::fs; @@ -16,26 +17,41 @@ fn run() -> Result<()> { let args: Vec = env::args().collect(); if args.len() < 3 { println!("Usage:"); - println!(" cargo run --example cli write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); - println!(" cargo run --example cli read "); + println!(" cargo run --example cli (file | folder) write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); + println!(" cargo run --example cli (file | folder) read "); return Ok(()); } - let command = &args[1]; - let path = &args[2]; - match command.as_str() { - "read" => read_command(&args, path), - "write" => write_command(&args, path), + let storage_type = &args[1]; + let command = &args[2]; + let path = &args[3]; + match storage_type.as_str() { + "file" => match command.as_str() { + "read" => file_read_command(&args, path), + "write" => file_write_command(&args, path), + _ => { + eprintln!("Invalid command. Use 'read' or 'write'."); + Ok(()) + } + }, + "folder" => match command.as_str() { + "read" => folder_read_command(&args, path), + "write" => folder_write_command(&args, path), + _ => { + eprintln!("Invalid command. Use 'read' or 'write'."); + Ok(()) + } + }, _ => { - eprintln!("Invalid command. Use 'read' or 'write'."); + eprintln!("Invalid storage. Use 'file' or 'folder'."); Ok(()) } } } -fn read_command(args: &[String], path: &str) -> Result<()> { +fn file_read_command(args: &[String], path: &str) -> Result<()> { let keys = if args.len() > 3 { - args[3] + args[4] .split(',') .map(|s| s.to_string()) .collect::>() @@ -66,13 +82,13 @@ fn read_command(args: &[String], path: &str) -> Result<()> { Ok(()) } -fn write_command(args: &[String], path: &str) -> Result<()> { +fn file_write_command(args: &[String], path: &str) -> Result<()> { if args.len() < 4 { - println!("Usage: cargo run --example cli write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); + println!("Usage: cargo run --example cli file write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); return Ok(()); } - let content = &args[3]; + let content = &args[4]; // Check if the content is a JSON file path let content_json = Path::new(content) .extension() @@ -110,5 +126,87 @@ fn write_command(args: &[String], path: &str) -> Result<()> { } } } + fs.write_fs().expect("Failed to write to file"); + Ok(()) +} + +fn folder_read_command(args: &[String], path: &str) -> Result<()> { + let keys = if args.len() > 3 { + args[4] + .split(',') + .map(|s| s.to_string()) + .collect::>() + } else { + vec![] + }; + + let mut fs: FolderStorage = + FolderStorage::new("cli".to_string(), Path::new(path)) + .context("Failed to create FolderStorage")?; + + let map = fs + .read_fs() + .expect("No Data is present on this path"); + if keys.is_empty() { + for (key, value) in map { + println!("{}: {}", key, value); + } + } + for key in &keys { + if let Some(value) = map.get(key) { + println!("{}: {}", key, value); + } else { + eprintln!("Key '{}' not found", key); + } + } + + Ok(()) +} + +fn folder_write_command(args: &[String], path: &str) -> Result<()> { + if args.len() < 4 { + println!("Usage: cargo run --example cli folder write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); + return Ok(()); + } + + let content = &args[4]; + // Check if the content is a JSON file path + let content_json = Path::new(content) + .extension() + .map_or(false, |ext| ext == "json"); + + let mut fs: FolderStorage = + FolderStorage::new("cli".to_string(), Path::new(path)) + .context("Failed to create FolderStorage")?; + if content_json { + let content = + fs::read_to_string(content).context("Failed to read JSON file")?; + let json: Value = + serde_json::from_str(&content).context("Failed to parse JSON")?; + if let Value::Object(object) = json { + for (key, value) in object { + if let Value::String(value_str) = value { + fs.set(key, value_str); + } else { + println!( + "Warning: Skipping non-string value for key '{}'", + key + ); + } + } + } else { + println!("JSON value is not an object"); + return Ok(()); + } + } else { + let pairs = content.split(','); + for pair in pairs { + let kv: Vec<&str> = pair.split(':').collect(); + if kv.len() == 2 { + fs.set(kv[0].to_string(), kv[1].to_string()); + } + } + } + fs.write_fs().expect("Failed to write to folder"); Ok(()) } diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 8de9a6c7..287a2b04 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use std::fs::{self, File}; use std::io::{Read, Write}; use std::time::SystemTime; @@ -9,7 +8,6 @@ use std::{ use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; -// use crate::utils::read_version_2_fs; use data_error::{ArklibError, Result}; /* @@ -24,7 +22,7 @@ Starting from version 3, data is stored in JSON format. For backward compatibility, we provide a helper function `read_version_2_fs` to read version 2 format. */ -const STORAGE_VERSION: i32 = 3; +// const STORAGE_VERSION: i32 = 3; /// Represents a folder storage system that persists data to disk. pub struct FolderStorage @@ -33,7 +31,7 @@ where { /// Label for logging label: String, - /// Path to the underlying file where data is persisted + /// Path to the underlying folder where data is persisted path: PathBuf, /// `ram_timestamps` can be used to track the last time a file was modified in memory. /// where the key is the path of the file inside the directory. @@ -48,12 +46,10 @@ where /// /// /// This is the data that is serialized and deserialized to and from disk. -#[derive(Serialize, Deserialize)] pub struct FolderStorageData where K: Ord, { - version: i32, entries: BTreeMap, } @@ -92,7 +88,6 @@ where ram_timestamps: BTreeMap::new(), disk_timestamps: BTreeMap::new(), data: FolderStorageData { - version: STORAGE_VERSION, entries: BTreeMap::new(), }, }; @@ -109,7 +104,7 @@ where if !self.path.exists() { return Err(ArklibError::Storage( self.label.clone(), - "File does not exist".to_owned(), + "Folder does not exist".to_owned(), )); } @@ -121,26 +116,33 @@ where } let mut data = FolderStorageData { - version: STORAGE_VERSION, entries: BTreeMap::new(), }; self.disk_timestamps.clear(); self.ram_timestamps.clear(); - // read_version_2_fs : unimplemented!() - for entry in fs::read_dir(&self.path)? { let entry = entry?; let path = entry.path(); if path.is_file() && path.extension().map_or(false, |ext| ext == "bin") { - let key = path - .file_stem() - .unwrap() + let file_stem = path.file_stem().ok_or_else(|| { + ArklibError::Storage( + self.label.clone(), + "Failed to extract file stem from filename".to_owned(), + ) + }); + + let key = file_stem? .to_str() - .unwrap() + .ok_or_else(|| { + ArklibError::Storage( + self.label.clone(), + "Failed to convert file stem to string".to_owned(), + ) + })? .parse::() .map_err(|_| { ArklibError::Storage( @@ -172,31 +174,55 @@ where Ok(data) } - fn remove_files_not_in_ram(&mut self) { - for entry in fs::read_dir(self.path.clone()).unwrap() { - let entry = entry.unwrap(); + fn remove_files_not_in_ram(&mut self) -> Result<()> { + let dir = fs::read_dir(&self.path).map_err(|e| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to read directory: {}", e), + ) + })?; + + for entry in dir { + let entry = entry.map_err(|e| { + ArklibError::Storage( + self.label.clone(), + format!("Failed to read directory entry: {}", e), + ) + })?; + let path = entry.path(); if path.is_file() - && path.extension().map_or(false, |ext| ext == "bin") + && path.extension().and_then(|ext| ext.to_str()) == Some("bin") { - let key = path + let file_stem = path .file_stem() - .unwrap() - .to_str() - .unwrap() - .parse::() - .map_err(|_| { + .and_then(|s| s.to_str()) + .ok_or_else(|| { ArklibError::Storage( - self.label.to_owned(), - "Failed to parse key from filename".to_owned(), + self.label.clone(), + "Invalid file name".to_owned(), ) - }) - .unwrap(); + })?; + + let key = file_stem.parse::().map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to parse key from filename".to_owned(), + ) + })?; + if !self.data.entries.contains_key(&key) { - fs::remove_file(path).unwrap(); + if let Err(e) = fs::remove_file(&path) { + ArklibError::Storage( + self.label.clone(), + format!("Failed to remove file {:?}: {}", path, e), + ); + } } } } + + Ok(()) } } @@ -225,8 +251,6 @@ where self.data.entries.remove(id).ok_or_else(|| { ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) })?; - // self.ram_timestamps.remove(id); - // OR self.ram_timestamps .insert(id.clone(), SystemTime::now()); Ok(()) @@ -332,6 +356,7 @@ where log::info!("{} sync status is {}", self.label, status); Ok(status) } + /// Sync the in-memory storage with the storage on disk fn sync(&mut self) -> Result<()> { match self.sync_status()? { @@ -390,7 +415,7 @@ where } // Remove files for keys that no longer exist - self.remove_files_not_in_ram(); + self.remove_files_not_in_ram().unwrap(); log::info!( "{} {} entries have been written", @@ -481,7 +506,7 @@ mod tests { assert_eq!(temp_dir.path().exists(), true); if let Err(err) = storage.erase() { - panic!("Failed to delete file: {:?}", err); + panic!("Failed to delete folder: {:?}", err); } assert!(!temp_dir.path().exists()); } From 353379ee38aba3b9b95ba365a2a22c12e7cd1f10 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Fri, 9 Aug 2024 21:52:21 +0530 Subject: [PATCH 09/16] fmt Signed-off-by: pushkarm029 --- fs-storage/examples/cli.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fs-storage/examples/cli.rs b/fs-storage/examples/cli.rs index 2def6556..9d327364 100644 --- a/fs-storage/examples/cli.rs +++ b/fs-storage/examples/cli.rs @@ -1,5 +1,8 @@ use anyhow::{Context, Result}; -use fs_storage::{base_storage::BaseStorage, file_storage::FileStorage, folder_storage::FolderStorage}; +use fs_storage::{ + base_storage::BaseStorage, file_storage::FileStorage, + folder_storage::FolderStorage, +}; use serde_json::Value; use std::{env, fs, path::Path}; From 0bdd7add20e0f37f214196dbe94bf08fdd1ce778 Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Thu, 22 Aug 2024 00:40:52 -0400 Subject: [PATCH 10/16] Add property based testing for FolderStorage (#86) * Add property test * Use print statements * experimental changes Signed-off-by: pushkarm029 * fix Signed-off-by: pushkarm029 * fix Signed-off-by: pushkarm029 * Refactor to remove unwraps * Fix folderstorage model with strong deletes --------- Signed-off-by: pushkarm029 Co-authored-by: pushkarm029 --- fs-storage/Cargo.toml | 3 + fs-storage/src/base_storage.rs | 2 +- fs-storage/src/file_storage.rs | 2 +- fs-storage/src/folder_storage.rs | 508 ++++++++++++++++++++++--------- 4 files changed, 370 insertions(+), 145 deletions(-) diff --git a/fs-storage/Cargo.toml b/fs-storage/Cargo.toml index cf64ff58..2a07869e 100644 --- a/fs-storage/Cargo.toml +++ b/fs-storage/Cargo.toml @@ -23,7 +23,10 @@ data-error = { path = "../data-error" } [dev-dependencies] anyhow = "1.0.81" +quickcheck = { version = "1.0.3", features = ["use_logging"] } +quickcheck_macros = "1.0.0" tempdir = "0.3.7" +test-log = "0.2.16" [features] default = ["jni-bindings"] diff --git a/fs-storage/src/base_storage.rs b/fs-storage/src/base_storage.rs index 4eba03b0..73db729f 100644 --- a/fs-storage/src/base_storage.rs +++ b/fs-storage/src/base_storage.rs @@ -54,7 +54,7 @@ pub trait BaseStorage: AsRef> { fn remove(&mut self, id: &K) -> Result<()>; /// Get [`SyncStatus`] of the storage - fn sync_status(&self) -> Result; + fn sync_status(&mut self) -> Result; /// Sync the in-memory storage with the storage on disk fn sync(&mut self) -> Result<()>; diff --git a/fs-storage/src/file_storage.rs b/fs-storage/src/file_storage.rs index 9b762bbd..26a50969 100644 --- a/fs-storage/src/file_storage.rs +++ b/fs-storage/src/file_storage.rs @@ -191,7 +191,7 @@ where /// Compare the timestamp of the storage file /// with the timestamp of the in-memory storage and the last written /// to time to determine if either of the two requires syncing. - fn sync_status(&self) -> Result { + fn sync_status(&mut self) -> Result { let file_updated = fs::metadata(&self.path)?.modified()?; // Determine the synchronization status based on the modification times diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 287a2b04..7233b8c8 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::fs::{self, File}; use std::io::{Read, Write}; use std::time::SystemTime; @@ -10,25 +11,8 @@ use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; use data_error::{ArklibError, Result}; -/* -Note on `FolderStorage` Versioning: - -`FolderStorage` is a basic key-value storage system that persists data to disk. -where the key is the path of the file inside the directory. - - -In version 2, `FolderStorage` stored data in a plaintext format. -Starting from version 3, data is stored in JSON format. - -For backward compatibility, we provide a helper function `read_version_2_fs` to read version 2 format. -*/ -// const STORAGE_VERSION: i32 = 3; - /// Represents a folder storage system that persists data to disk. -pub struct FolderStorage -where - K: Ord, -{ +pub struct FolderStorage { /// Label for logging label: String, /// Path to the underlying folder where data is persisted @@ -40,16 +24,12 @@ where /// where the key is the path of the file inside the directory. disk_timestamps: BTreeMap, data: FolderStorageData, + /// Temporary store for deleted keys until storage is synced + deleted_keys: BTreeSet, } /// A struct that represents the data stored in a [`FolderStorage`] instance. -/// -/// -/// This is the data that is serialized and deserialized to and from disk. -pub struct FolderStorageData -where - K: Ord, -{ +pub struct FolderStorageData { entries: BTreeMap, } @@ -70,11 +50,7 @@ where + serde::de::DeserializeOwned + std::str::FromStr + std::fmt::Display, - V: Clone - + serde::Serialize - + serde::de::DeserializeOwned - + std::str::FromStr - + Monoid, + V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, { /// Create a new folder storage with a diagnostic label and directory path /// The storage will be initialized using the disk data, if the path exists @@ -90,6 +66,7 @@ where data: FolderStorageData { entries: BTreeMap::new(), }, + deleted_keys: BTreeSet::new(), }; if Path::exists(path) { @@ -100,7 +77,7 @@ where } /// Load mapping from folder storage - fn load_fs_data(&mut self) -> Result> { + fn load_fs_data(&mut self) -> Result<()> { if !self.path.exists() { return Err(ArklibError::Storage( self.label.clone(), @@ -119,38 +96,13 @@ where entries: BTreeMap::new(), }; - self.disk_timestamps.clear(); - self.ram_timestamps.clear(); - for entry in fs::read_dir(&self.path)? { let entry = entry?; let path = entry.path(); if path.is_file() && path.extension().map_or(false, |ext| ext == "bin") { - let file_stem = path.file_stem().ok_or_else(|| { - ArklibError::Storage( - self.label.clone(), - "Failed to extract file stem from filename".to_owned(), - ) - }); - - let key = file_stem? - .to_str() - .ok_or_else(|| { - ArklibError::Storage( - self.label.clone(), - "Failed to convert file stem to string".to_owned(), - ) - })? - .parse::() - .map_err(|_| { - ArklibError::Storage( - self.label.clone(), - "Failed to parse key from filename".to_owned(), - ) - })?; - + let key: K = self.extract_key_from_file_path(&path)?; let mut file = File::open(&path)?; let mut buffer = Vec::new(); file.read_to_end(&mut buffer)?; @@ -171,59 +123,90 @@ where } } } - Ok(data) + + self.data = data; + Ok(()) } - fn remove_files_not_in_ram(&mut self) -> Result<()> { - let dir = fs::read_dir(&self.path).map_err(|e| { - ArklibError::Storage( - self.label.clone(), - format!("Failed to read directory: {}", e), - ) - })?; + /// Resolve differences between memory and disk data + fn resolve_divergence(&mut self) -> Result<()> { + let new_data = FolderStorage::new("new_data".into(), &self.path)?; - for entry in dir { - let entry = entry.map_err(|e| { - ArklibError::Storage( - self.label.clone(), - format!("Failed to read directory entry: {}", e), - ) - })?; + for (key, new_value) in new_data.data.entries.iter() { + if let Some(existing_value) = self.data.entries.get(key) { + let existing_value_updated = self + .ram_timestamps + .get(key) + .map(|ram_stamp| { + self.disk_timestamps + .get(key) + .map(|disk_stamp| ram_stamp > disk_stamp) + }) + .flatten() + .unwrap_or(false); + + // Use monoid to combine value for the given key + // if the memory and disk have diverged + if existing_value_updated { + let resolved_value = V::combine(existing_value, new_value); + self.data + .entries + .insert(key.clone(), resolved_value); + } else { + self.data + .entries + .insert(key.clone(), new_value.clone()); + } + } else { + self.data + .entries + .insert(key.clone(), new_value.clone()); + } + } + + Ok(()) + } + /// Remove files from disk that are not present in memory + fn remove_files_not_in_ram(&mut self) -> Result<()> { + for entry in fs::read_dir(&self.path)? { + let entry = entry?; let path = entry.path(); if path.is_file() - && path.extension().and_then(|ext| ext.to_str()) == Some("bin") + && path.extension().map_or(false, |ext| ext == "bin") { - let file_stem = path - .file_stem() - .and_then(|s| s.to_str()) - .ok_or_else(|| { - ArklibError::Storage( - self.label.clone(), - "Invalid file name".to_owned(), - ) - })?; - - let key = file_stem.parse::().map_err(|_| { - ArklibError::Storage( - self.label.clone(), - "Failed to parse key from filename".to_owned(), - ) - })?; - + let key: K = self.extract_key_from_file_path(&path)?; if !self.data.entries.contains_key(&key) { - if let Err(e) = fs::remove_file(&path) { - ArklibError::Storage( - self.label.clone(), - format!("Failed to remove file {:?}: {}", path, e), - ); - } + fs::remove_file(&path)?; } } } - Ok(()) } + + pub fn extract_key_from_file_path(&self, path: &Path) -> Result { + path.file_stem() + .ok_or_else(|| { + ArklibError::Storage( + self.label.clone(), + "Failed to extract file stem from filename".to_owned(), + ) + })? + .to_str() + .ok_or_else(|| { + ArklibError::Storage( + self.label.clone(), + "Failed to convert file stem to string".to_owned(), + ) + })? + .parse::() + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to parse key from filename".to_owned(), + ) + }) + } } impl BaseStorage for FolderStorage @@ -234,37 +217,42 @@ where + serde::de::DeserializeOwned + std::str::FromStr + std::fmt::Display, - V: Clone - + serde::Serialize - + serde::de::DeserializeOwned - + std::str::FromStr - + Monoid, + V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, { /// Set a key-value pair in the internal mapping fn set(&mut self, key: K, value: V) { self.data.entries.insert(key.clone(), value); + self.deleted_keys.remove(&key); self.ram_timestamps.insert(key, SystemTime::now()); } /// Remove an entry from the internal mapping given a key fn remove(&mut self, id: &K) -> Result<()> { - self.data.entries.remove(id).ok_or_else(|| { - ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) - })?; - self.ram_timestamps - .insert(id.clone(), SystemTime::now()); - Ok(()) + match self.data.entries.remove(id) { + Some(_) => { + self.deleted_keys.insert(id.clone()); + Ok(()) + } + None => Err(ArklibError::Storage( + self.label.clone(), + "Key not found".to_owned(), + )), + } } /// Compare the timestamp of the storage files /// with the timestamps of the in-memory storage and the last written /// to time to determine if either of the two requires syncing. - fn sync_status(&self) -> Result { - let mut ram_newer = false; + fn sync_status(&mut self) -> Result { + let mut ram_newer = !self.deleted_keys.is_empty(); let mut disk_newer = false; - for (key, ram_timestamp) in &self.ram_timestamps { + for key in self.data.entries.keys() { let file_path = self.path.join(format!("{}.bin", key)); + let ram_timestamp = self + .ram_timestamps + .get(key) + .expect("Data entry key should have ram timestamp"); if let Ok(metadata) = fs::metadata(&file_path) { if let Ok(disk_timestamp) = metadata.modified() { @@ -314,7 +302,7 @@ where } // Skip this check if this divergent condition has already been reached - if !ram_newer || !disk_newer { + if !(ram_newer && disk_newer) { // Check for files on disk that aren't in RAM for entry in fs::read_dir(&self.path)? { let entry = entry?; @@ -322,60 +310,51 @@ where if path.is_file() && path.extension().map_or(false, |ext| ext == "bin") { - let key = path - .file_stem() - .unwrap() - .to_str() - .unwrap() - .parse::() - .map_err(|_| { - ArklibError::Storage( - self.label.clone(), - "Failed to parse key from filename".to_owned(), - ) - })?; - if !self.ram_timestamps.contains_key(&key) { + let key = self.extract_key_from_file_path(&path)?; + if !self.data.entries.contains_key(&key) + && !self.deleted_keys.contains(&key) + { disk_newer = true; - log::debug!( - "Disk newer: file {} exists on disk but not in RAM", - path.display() - ); break; } } } } - let status = match (ram_newer, disk_newer) { + let new_status = match (ram_newer, disk_newer) { (false, false) => SyncStatus::InSync, (true, false) => SyncStatus::StorageStale, (false, true) => SyncStatus::MappingStale, (true, true) => SyncStatus::Diverge, }; - log::info!("{} sync status is {}", self.label, status); - Ok(status) + log::info!("{} sync status is {}", self.label, new_status); + Ok(new_status) } /// Sync the in-memory storage with the storage on disk fn sync(&mut self) -> Result<()> { match self.sync_status()? { - SyncStatus::InSync => Ok(()), - SyncStatus::MappingStale => self.read_fs().map(|_| ()), - SyncStatus::StorageStale => self.write_fs().map(|_| ()), + SyncStatus::InSync => {} + SyncStatus::MappingStale => { + self.read_fs()?; + } + SyncStatus::StorageStale => { + self.write_fs()?; + } SyncStatus::Diverge => { - let data = self.load_fs_data()?; - self.merge_from(&data)?; + self.resolve_divergence()?; self.write_fs()?; - Ok(()) } - } + }; + + self.deleted_keys.clear(); + Ok(()) } /// Read the data from folder storage fn read_fs(&mut self) -> Result<&BTreeMap> { - let data = self.load_fs_data()?; - self.data = data; + self.load_fs_data()?; Ok(&self.data.entries) } @@ -414,8 +393,21 @@ where .insert(key.clone(), new_timestamp); } + // Delete files for previously deleted keys + self.deleted_keys.iter().for_each(|key| { + log::debug!("Deleting key: {}", key); + self.data.entries.remove(key); + self.ram_timestamps.remove(key); + self.disk_timestamps.remove(key); + let file_path = self.path.join(format!("{}.bin", key)); + if file_path.exists() { + fs::remove_file(&file_path).expect("Failed to delete file"); + } + }); + self.deleted_keys.clear(); + // Remove files for keys that no longer exist - self.remove_files_not_in_ram().unwrap(); + self.remove_files_not_in_ram()?; log::info!( "{} {} entries have been written", @@ -466,9 +458,19 @@ mod tests { use crate::{ base_storage::{BaseStorage, SyncStatus}, folder_storage::FolderStorage, + monoid::Monoid, + }; + use std::{ + collections::BTreeSet, + fs::{self, File}, + io::Write, + thread, + time::{Duration, SystemTime}, }; - use std::{fs, thread, time::Duration}; + use data_error::Result; + use quickcheck_macros::quickcheck; + use serde::{Deserialize, Serialize}; use tempdir::TempDir; #[test] @@ -595,4 +597,224 @@ mod tests { assert_eq!(storage1.as_ref().get("key2"), Some(&6)); assert_eq!(storage1.as_ref().get("key3"), Some(&9)); } + + use quickcheck::{Arbitrary, Gen}; + use std::collections::{BTreeMap, HashSet}; + + // Assuming FolderStorage, BaseStorage, SyncStatus, and other necessary types are in scope + + #[derive(Clone, Debug)] + enum StorageOperation { + Set(String), + Remove(String), + Sync, + ExternalModify(String), + ExternalSet(String), + } + + #[derive(Clone, Debug)] + struct StorageOperationSequence(Vec); + + impl Arbitrary for StorageOperationSequence { + fn arbitrary(g: &mut Gen) -> Self { + let mut existing_keys = HashSet::new(); + let mut ops = Vec::new(); + let size = usize::arbitrary(g) % 100 + 1; // Generate 1 to 100 operations + + for _ in 0..size { + let op = match u8::arbitrary(g) % 9 { + 0 | 1 | 2 | 3 | 4 => { + let key = u8::arbitrary(g).to_string(); + existing_keys.insert(key.clone()); + StorageOperation::Set(key) + } + 5 if !existing_keys.is_empty() => { + let key = g + .choose( + &existing_keys + .iter() + .cloned() + .collect::>(), + ) + .unwrap() + .clone(); + existing_keys.remove(&key); + StorageOperation::Remove(key) + } + 6 => StorageOperation::Sync, + 7 if !existing_keys.is_empty() => { + let key = g + .choose( + &existing_keys + .iter() + .cloned() + .collect::>(), + ) + .unwrap() + .clone(); + StorageOperation::ExternalModify(key) + } + _ => { + let key = u8::arbitrary(g).to_string(); + StorageOperation::ExternalSet(key) + } + }; + ops.push(op); + } + + StorageOperationSequence(ops) + } + } + + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] + struct Dummy; + + impl Monoid for Dummy { + fn neutral() -> Dummy { + Dummy + } + + fn combine(_a: &Dummy, _b: &Dummy) -> Dummy { + Dummy + } + } + + // #[test_log::test] + #[quickcheck] + fn prop_folder_storage_correct( + StorageOperationSequence(operations): StorageOperationSequence, + ) { + let temp_dir = + TempDir::new("temp").expect("Failed to create temporary directory"); + let path = temp_dir.path(); + + let mut storage = + FolderStorage::::new("test".to_string(), &path) + .unwrap(); + let mut expected_data: BTreeMap = BTreeMap::new(); + let mut pending_deletes = BTreeSet::new(); + let mut pending_sets: BTreeMap = BTreeMap::new(); + let mut pending_external: BTreeMap = BTreeMap::new(); + + // Check initial state + assert_eq!( + storage.sync_status().unwrap(), + SyncStatus::InSync, + "Storage should be InSync when created" + ); + + let v = Dummy; + for (i, op) in operations.into_iter().enumerate() { + match op { + StorageOperation::Set(k) => { + storage.set(k.clone(), v); + pending_sets.insert(k.clone(), i); + pending_deletes.remove(&k); + + let status = storage.sync_status().unwrap(); + let expected_status = expected_status( + &pending_external, + &pending_sets, + &pending_deletes, + ); + assert_eq!(status, expected_status); + } + StorageOperation::Remove(k) => { + storage.remove(&k).unwrap(); + pending_sets.remove(&k); + pending_deletes.insert(k.clone()); + + let status = storage.sync_status().unwrap(); + let expected_status = expected_status( + &pending_external, + &pending_sets, + &pending_deletes, + ); + assert_eq!(status, expected_status); + } + StorageOperation::Sync => { + storage.sync().unwrap(); + + // Note: Concurrent deletes are overriden by sets + // Hence, deletes are weak. Also, for values where + // monoidal combination is relevant, this logic will + // have to be updated. + pending_sets + .keys() + .chain(pending_external.keys()) + .for_each(|k| { + expected_data.insert(k.clone(), v); + }); + pending_deletes.iter().for_each(|key| { + expected_data.remove(key); + }); + + pending_sets.clear(); + pending_external.clear(); + pending_deletes.clear(); + + let status = storage.sync_status().unwrap(); + assert_eq!(status, SyncStatus::InSync); + assert_eq!(storage.data.entries, expected_data); + } + StorageOperation::ExternalModify(k) + | StorageOperation::ExternalSet(k) => { + perform_external_modification(path, &k, v).unwrap(); + pending_external.insert(k.clone(), i); + let status = storage.sync_status().unwrap(); + let expected_status = expected_status( + &pending_external, + &pending_sets, + &pending_deletes, + ); + assert_eq!(status, expected_status); + } + } + + assert!( + pending_sets + .keys() + .filter(|key| pending_deletes.contains(*key)) + .count() + == 0 + ); + } + } + + fn perform_external_modification( + path: &std::path::Path, + key: &str, + value: Dummy, + ) -> Result<()> { + let mut file = File::create(path.join(format!("{}.bin", key)))?; + let bytes = bincode::serialize(&value).unwrap(); + file.write_all(&bytes)?; + let time = SystemTime::now(); + file.set_modified(time).unwrap(); + file.sync_all()?; + Ok(()) + } + + fn expected_status( + pending_external: &BTreeMap, + pending_sets: &BTreeMap, + pending_deletes: &BTreeSet, + ) -> SyncStatus { + let ram_newer = !pending_deletes.is_empty(); + let ram_newer = ram_newer + || pending_sets + .iter() + .any(|(k, v)| pending_external.get(k).map_or(true, |e| v > e)); + let disk_newer = pending_external + .iter() + .filter(|(k, _)| !pending_deletes.contains(*k)) + .any(|(k, v)| pending_sets.get(k).map_or(true, |s| v > s)); + + match (ram_newer, disk_newer) { + (false, false) => SyncStatus::InSync, + (true, false) => SyncStatus::StorageStale, + (false, true) => SyncStatus::MappingStale, + (true, true) => SyncStatus::Diverge, + } + } } From 68ea33101753471561ce23337df3c3af3fa4367a Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Thu, 22 Aug 2024 10:14:07 +0530 Subject: [PATCH 11/16] Fix formatting --- fs-storage/src/folder_storage.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 7233b8c8..7893a766 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -137,12 +137,11 @@ where let existing_value_updated = self .ram_timestamps .get(key) - .map(|ram_stamp| { + .and_then(|ram_stamp| { self.disk_timestamps .get(key) .map(|disk_stamp| ram_stamp > disk_stamp) }) - .flatten() .unwrap_or(false); // Use monoid to combine value for the given key From c53a0b492d068734fa1cb77e23a7e380cf3dd93c Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Sat, 24 Aug 2024 16:03:01 +0530 Subject: [PATCH 12/16] fix Signed-off-by: pushkarm029 --- fs-storage/README.md | 12 +++-- fs-storage/examples/cli.rs | 4 +- fs-storage/src/folder_storage.rs | 89 +++++++++++--------------------- 3 files changed, 40 insertions(+), 65 deletions(-) diff --git a/fs-storage/README.md b/fs-storage/README.md index cfd68569..6401dce6 100644 --- a/fs-storage/README.md +++ b/fs-storage/README.md @@ -14,22 +14,26 @@ File system storage implementation for writing key value pairs to disk. } ``` +- Select between two storage options: + - file: Stores multiple key-value pairs in a single file. + - folder: Stores each value in a separate file within a folder. + - Run Write Command ```bash -cargo run --example cli write /tmp/z test.json +cargo run --example cli [file|folder] write /tmp/z test.json ``` Alternatively, you can directly provide the input data as a comma-separated list of key-value pairs ```bash -cargo run --example cli write /tmp/z a:1,b:2,c:3 +cargo run --example cli [file|folder] write /tmp/z a:1,b:2,c:3 ``` - Run Read Command ```bash -cargo run --example cli read /tmp/z key1,key2 +cargo run --example cli [file|folder] read /tmp/z key1,key2 ``` - Get Output @@ -42,5 +46,5 @@ key2: value2 - To get all key value pairs ```bash -cargo run --example cli read /tmp/z +cargo run --example cli [file|folder] read /tmp/z ``` diff --git a/fs-storage/examples/cli.rs b/fs-storage/examples/cli.rs index 9d327364..aaa4ad4c 100644 --- a/fs-storage/examples/cli.rs +++ b/fs-storage/examples/cli.rs @@ -16,8 +16,8 @@ fn run() -> Result<()> { let args: Vec = env::args().collect(); if args.len() < 3 { println!("Usage:"); - println!(" cargo run --example cli (file | folder) write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); - println!(" cargo run --example cli (file | folder) read "); + println!(" cargo run --example cli [file|folder] write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); + println!(" cargo run --example cli [file|folder] read "); return Ok(()); } diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 7893a766..641c54dc 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -23,22 +23,17 @@ pub struct FolderStorage { /// `disk_timestamps` can be used to track the last time a file written or read from disk. /// where the key is the path of the file inside the directory. disk_timestamps: BTreeMap, - data: FolderStorageData, + data: BTreeMap, /// Temporary store for deleted keys until storage is synced deleted_keys: BTreeSet, } -/// A struct that represents the data stored in a [`FolderStorage`] instance. -pub struct FolderStorageData { - entries: BTreeMap, -} - -impl AsRef> for FolderStorageData +impl AsRef> for FolderStorage where K: Ord, { fn as_ref(&self) -> &BTreeMap { - &self.entries + &self.data } } @@ -63,9 +58,7 @@ where path: PathBuf::from(path), ram_timestamps: BTreeMap::new(), disk_timestamps: BTreeMap::new(), - data: FolderStorageData { - entries: BTreeMap::new(), - }, + data: BTreeMap::new(), deleted_keys: BTreeSet::new(), }; @@ -92,9 +85,7 @@ where )); } - let mut data = FolderStorageData { - entries: BTreeMap::new(), - }; + let mut data = BTreeMap::new(); for entry in fs::read_dir(&self.path)? { let entry = entry?; @@ -113,7 +104,7 @@ where format!("Failed to deserialize value: {}", e), ) })?; - data.entries.insert(key.clone(), value); + data.insert(key.clone(), value); if let Ok(metadata) = fs::metadata(&path) { if let Ok(modified) = metadata.modified() { @@ -132,8 +123,8 @@ where fn resolve_divergence(&mut self) -> Result<()> { let new_data = FolderStorage::new("new_data".into(), &self.path)?; - for (key, new_value) in new_data.data.entries.iter() { - if let Some(existing_value) = self.data.entries.get(key) { + for (key, new_value) in new_data.data.iter() { + if let Some(existing_value) = self.data.get(key) { let existing_value_updated = self .ram_timestamps .get(key) @@ -148,18 +139,12 @@ where // if the memory and disk have diverged if existing_value_updated { let resolved_value = V::combine(existing_value, new_value); - self.data - .entries - .insert(key.clone(), resolved_value); + self.data.insert(key.clone(), resolved_value); } else { - self.data - .entries - .insert(key.clone(), new_value.clone()); + self.data.insert(key.clone(), new_value.clone()); } } else { - self.data - .entries - .insert(key.clone(), new_value.clone()); + self.data.insert(key.clone(), new_value.clone()); } } @@ -168,16 +153,10 @@ where /// Remove files from disk that are not present in memory fn remove_files_not_in_ram(&mut self) -> Result<()> { - for entry in fs::read_dir(&self.path)? { - let entry = entry?; - let path = entry.path(); - if path.is_file() - && path.extension().map_or(false, |ext| ext == "bin") - { - let key: K = self.extract_key_from_file_path(&path)?; - if !self.data.entries.contains_key(&key) { - fs::remove_file(&path)?; - } + for key in self.deleted_keys.iter() { + let file_path = self.path.join(format!("{}.bin", key)); + if file_path.exists() { + fs::remove_file(&file_path).expect("Failed to delete file"); } } Ok(()) @@ -220,14 +199,14 @@ where { /// Set a key-value pair in the internal mapping fn set(&mut self, key: K, value: V) { - self.data.entries.insert(key.clone(), value); + self.data.insert(key.clone(), value); self.deleted_keys.remove(&key); self.ram_timestamps.insert(key, SystemTime::now()); } /// Remove an entry from the internal mapping given a key fn remove(&mut self, id: &K) -> Result<()> { - match self.data.entries.remove(id) { + match self.data.remove(id) { Some(_) => { self.deleted_keys.insert(id.clone()); Ok(()) @@ -246,7 +225,7 @@ where let mut ram_newer = !self.deleted_keys.is_empty(); let mut disk_newer = false; - for key in self.data.entries.keys() { + for key in self.data.keys() { let file_path = self.path.join(format!("{}.bin", key)); let ram_timestamp = self .ram_timestamps @@ -310,7 +289,7 @@ where && path.extension().map_or(false, |ext| ext == "bin") { let key = self.extract_key_from_file_path(&path)?; - if !self.data.entries.contains_key(&key) + if !self.data.contains_key(&key) && !self.deleted_keys.contains(&key) { disk_newer = true; @@ -354,22 +333,23 @@ where /// Read the data from folder storage fn read_fs(&mut self) -> Result<&BTreeMap> { self.load_fs_data()?; - Ok(&self.data.entries) + Ok(&self.data) } /// Get a value from the internal mapping fn get(&self, id: &K) -> Option<&V> { - self.data.entries.get(id) + self.data.get(id) } - /// Write the data to folder + /// Writes the data to a folder. /// - /// Update the modified timestamp in file metadata to avoid OS timing issues - /// https://github.com/ARK-Builders/ark-rust/pull/63#issuecomment-2163882227 + /// Updates the file's modified timestamp to avoid OS timing issues, which may arise due to file system timestamp precision. + /// EXT3 has 1-second precision, while EXT4 can be more precise but not always. + /// This is addressed by modifying the metadata and calling `sync_all()` after file writes. fn write_fs(&mut self) -> Result<()> { fs::create_dir_all(&self.path)?; - for (key, value) in &self.data.entries { + for (key, value) in &self.data { let file_path = self.path.join(format!("{}.bin", key)); let encoded: Vec = bincode::serialize(value).map_err(|e| { ArklibError::Storage( @@ -395,7 +375,7 @@ where // Delete files for previously deleted keys self.deleted_keys.iter().for_each(|key| { log::debug!("Deleting key: {}", key); - self.data.entries.remove(key); + self.data.remove(key); self.ram_timestamps.remove(key); self.disk_timestamps.remove(key); let file_path = self.path.join(format!("{}.bin", key)); @@ -411,7 +391,7 @@ where log::info!( "{} {} entries have been written", self.label, - self.data.entries.len() + self.data.len() ); Ok(()) } @@ -430,7 +410,7 @@ where { let other_entries = other.as_ref(); for (key, value) in other_entries { - if let Some(existing_value) = self.data.entries.get(key) { + if let Some(existing_value) = self.data.get(key) { let resolved_value = V::combine(existing_value, value); self.set(key.clone(), resolved_value); } else { @@ -443,15 +423,6 @@ where } } -impl AsRef> for FolderStorage -where - K: Ord, -{ - fn as_ref(&self) -> &BTreeMap { - &self.data.entries - } -} - #[cfg(test)] mod tests { use crate::{ @@ -754,7 +725,7 @@ mod tests { let status = storage.sync_status().unwrap(); assert_eq!(status, SyncStatus::InSync); - assert_eq!(storage.data.entries, expected_data); + assert_eq!(storage.data, expected_data); } StorageOperation::ExternalModify(k) | StorageOperation::ExternalSet(k) => { From 283f84298fe6ff29854507339a0409e94f4a4250 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Mon, 26 Aug 2024 21:51:02 +0530 Subject: [PATCH 13/16] fix Signed-off-by: pushkarm029 --- fs-storage/src/folder_storage.rs | 84 ++++++++++++++------------------ 1 file changed, 37 insertions(+), 47 deletions(-) diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 641c54dc..ea0af6a4 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -1,6 +1,6 @@ use std::collections::BTreeSet; use std::fs::{self, File}; -use std::io::{Read, Write}; +use std::io::Write; use std::time::SystemTime; use std::{ collections::BTreeMap, @@ -17,12 +17,12 @@ pub struct FolderStorage { label: String, /// Path to the underlying folder where data is persisted path: PathBuf, - /// `ram_timestamps` can be used to track the last time a file was modified in memory. + /// `modified` can be used to track the last time a file was modified in memory. /// where the key is the path of the file inside the directory. - ram_timestamps: BTreeMap, - /// `disk_timestamps` can be used to track the last time a file written or read from disk. + modified: BTreeMap, + /// `written_to_disk` can be used to track the last time a file written or read from disk. /// where the key is the path of the file inside the directory. - disk_timestamps: BTreeMap, + written_to_disk: BTreeMap, data: BTreeMap, /// Temporary store for deleted keys until storage is synced deleted_keys: BTreeSet, @@ -56,8 +56,8 @@ where let mut storage = Self { label, path: PathBuf::from(path), - ram_timestamps: BTreeMap::new(), - disk_timestamps: BTreeMap::new(), + modified: BTreeMap::new(), + written_to_disk: BTreeMap::new(), data: BTreeMap::new(), deleted_keys: BTreeSet::new(), }; @@ -91,25 +91,24 @@ where let entry = entry?; let path = entry.path(); if path.is_file() - && path.extension().map_or(false, |ext| ext == "bin") + && path.extension().map_or(false, |ext| ext == "txt") { let key: K = self.extract_key_from_file_path(&path)?; - let mut file = File::open(&path)?; - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer)?; - - let value: V = bincode::deserialize(&buffer).map_err(|e| { - ArklibError::Storage( - self.label.clone(), - format!("Failed to deserialize value: {}", e), - ) - })?; + let file = File::open(&path)?; + let value: V = + serde_json::from_reader(file).map_err(|err| { + ArklibError::Storage( + self.label.clone(), + err.to_string(), + ) + })?; + data.insert(key.clone(), value); if let Ok(metadata) = fs::metadata(&path) { if let Ok(modified) = metadata.modified() { - self.disk_timestamps.insert(key.clone(), modified); - self.ram_timestamps.insert(key, modified); + self.written_to_disk.insert(key.clone(), modified); + self.modified.insert(key, modified); } } } @@ -118,7 +117,6 @@ where self.data = data; Ok(()) } - /// Resolve differences between memory and disk data fn resolve_divergence(&mut self) -> Result<()> { let new_data = FolderStorage::new("new_data".into(), &self.path)?; @@ -126,10 +124,10 @@ where for (key, new_value) in new_data.data.iter() { if let Some(existing_value) = self.data.get(key) { let existing_value_updated = self - .ram_timestamps + .modified .get(key) .and_then(|ram_stamp| { - self.disk_timestamps + self.written_to_disk .get(key) .map(|disk_stamp| ram_stamp > disk_stamp) }) @@ -154,7 +152,7 @@ where /// Remove files from disk that are not present in memory fn remove_files_not_in_ram(&mut self) -> Result<()> { for key in self.deleted_keys.iter() { - let file_path = self.path.join(format!("{}.bin", key)); + let file_path = self.path.join(format!("{}.txt", key)); if file_path.exists() { fs::remove_file(&file_path).expect("Failed to delete file"); } @@ -201,7 +199,7 @@ where fn set(&mut self, key: K, value: V) { self.data.insert(key.clone(), value); self.deleted_keys.remove(&key); - self.ram_timestamps.insert(key, SystemTime::now()); + self.modified.insert(key, SystemTime::now()); } /// Remove an entry from the internal mapping given a key @@ -226,9 +224,9 @@ where let mut disk_newer = false; for key in self.data.keys() { - let file_path = self.path.join(format!("{}.bin", key)); + let file_path = self.path.join(format!("{}.txt", key)); let ram_timestamp = self - .ram_timestamps + .modified .get(key) .expect("Data entry key should have ram timestamp"); @@ -286,7 +284,7 @@ where let entry = entry?; let path = entry.path(); if path.is_file() - && path.extension().map_or(false, |ext| ext == "bin") + && path.extension().map_or(false, |ext| ext == "txt") { let key = self.extract_key_from_file_path(&path)?; if !self.data.contains_key(&key) @@ -350,35 +348,27 @@ where fs::create_dir_all(&self.path)?; for (key, value) in &self.data { - let file_path = self.path.join(format!("{}.bin", key)); - let encoded: Vec = bincode::serialize(value).map_err(|e| { - ArklibError::Storage( - self.label.clone(), - format!("Failed to serialize value: {}", e), - ) - })?; - + let file_path = self.path.join(format!("{}.txt", key)); let mut file = File::create(&file_path)?; - file.write_all(&encoded)?; + file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; file.flush()?; let new_timestamp = SystemTime::now(); file.set_modified(new_timestamp)?; file.sync_all()?; - self.disk_timestamps - .insert(key.clone(), new_timestamp); - self.ram_timestamps + self.written_to_disk .insert(key.clone(), new_timestamp); + self.modified.insert(key.clone(), new_timestamp); } // Delete files for previously deleted keys self.deleted_keys.iter().for_each(|key| { log::debug!("Deleting key: {}", key); self.data.remove(key); - self.ram_timestamps.remove(key); - self.disk_timestamps.remove(key); - let file_path = self.path.join(format!("{}.bin", key)); + self.modified.remove(key); + self.written_to_disk.remove(key); + let file_path = self.path.join(format!("{}.txt", key)); if file_path.exists() { fs::remove_file(&file_path).expect("Failed to delete file"); } @@ -416,7 +406,7 @@ where } else { self.set(key.clone(), value.clone()) } - self.ram_timestamps + self.modified .insert(key.clone(), SystemTime::now()); } Ok(()) @@ -756,9 +746,9 @@ mod tests { key: &str, value: Dummy, ) -> Result<()> { - let mut file = File::create(path.join(format!("{}.bin", key)))?; - let bytes = bincode::serialize(&value).unwrap(); - file.write_all(&bytes)?; + let mut file = File::create(path.join(format!("{}.txt", key)))?; + file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; + file.flush()?; let time = SystemTime::now(); file.set_modified(time).unwrap(); file.sync_all()?; From 887a0d27442720c3f524427160a11bc1b52bfeee Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Mon, 26 Aug 2024 21:54:18 +0530 Subject: [PATCH 14/16] remove bincode Signed-off-by: pushkarm029 --- fs-storage/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/fs-storage/Cargo.toml b/fs-storage/Cargo.toml index 2a07869e..31cb73fd 100644 --- a/fs-storage/Cargo.toml +++ b/fs-storage/Cargo.toml @@ -17,7 +17,6 @@ serde_json = "1.0.82" serde = { version = "1.0.138", features = ["derive"] } jni = { version = "0.21.1", optional = true } jnix = { version = "0.5.1", features = ["derive"], optional = true } -bincode = "1.3" data-error = { path = "../data-error" } From aba82ac5fee3c87e31bdf9c13a4e439feb60641d Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Tue, 27 Aug 2024 11:34:41 +0530 Subject: [PATCH 15/16] Change folderstorage file extension to json --- fs-storage/src/folder_storage.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index ea0af6a4..60d2dea8 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -91,7 +91,9 @@ where let entry = entry?; let path = entry.path(); if path.is_file() - && path.extension().map_or(false, |ext| ext == "txt") + && path + .extension() + .map_or(false, |ext| ext == "json") { let key: K = self.extract_key_from_file_path(&path)?; let file = File::open(&path)?; @@ -152,7 +154,7 @@ where /// Remove files from disk that are not present in memory fn remove_files_not_in_ram(&mut self) -> Result<()> { for key in self.deleted_keys.iter() { - let file_path = self.path.join(format!("{}.txt", key)); + let file_path = self.path.join(format!("{}.json", key)); if file_path.exists() { fs::remove_file(&file_path).expect("Failed to delete file"); } @@ -224,7 +226,7 @@ where let mut disk_newer = false; for key in self.data.keys() { - let file_path = self.path.join(format!("{}.txt", key)); + let file_path = self.path.join(format!("{}.json", key)); let ram_timestamp = self .modified .get(key) @@ -284,7 +286,9 @@ where let entry = entry?; let path = entry.path(); if path.is_file() - && path.extension().map_or(false, |ext| ext == "txt") + && path + .extension() + .map_or(false, |ext| ext == "json") { let key = self.extract_key_from_file_path(&path)?; if !self.data.contains_key(&key) @@ -348,7 +352,7 @@ where fs::create_dir_all(&self.path)?; for (key, value) in &self.data { - let file_path = self.path.join(format!("{}.txt", key)); + let file_path = self.path.join(format!("{}.json", key)); let mut file = File::create(&file_path)?; file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; file.flush()?; @@ -368,7 +372,7 @@ where self.data.remove(key); self.modified.remove(key); self.written_to_disk.remove(key); - let file_path = self.path.join(format!("{}.txt", key)); + let file_path = self.path.join(format!("{}.json", key)); if file_path.exists() { fs::remove_file(&file_path).expect("Failed to delete file"); } @@ -746,7 +750,7 @@ mod tests { key: &str, value: Dummy, ) -> Result<()> { - let mut file = File::create(path.join(format!("{}.txt", key)))?; + let mut file = File::create(path.join(format!("{}.json", key)))?; file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; file.flush()?; let time = SystemTime::now(); From f607c3ed0252e1d403ffda013c092aab64cb9f26 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Sat, 31 Aug 2024 17:25:22 +0530 Subject: [PATCH 16/16] fix Signed-off-by: pushkarm029 --- fs-storage/Cargo.toml | 1 - fs-storage/examples/cli.rs | 2 +- fs-storage/src/folder_storage.rs | 148 ++++++++++++++++++------------- 3 files changed, 87 insertions(+), 64 deletions(-) diff --git a/fs-storage/Cargo.toml b/fs-storage/Cargo.toml index 31cb73fd..de74b108 100644 --- a/fs-storage/Cargo.toml +++ b/fs-storage/Cargo.toml @@ -25,7 +25,6 @@ anyhow = "1.0.81" quickcheck = { version = "1.0.3", features = ["use_logging"] } quickcheck_macros = "1.0.0" tempdir = "0.3.7" -test-log = "0.2.16" [features] default = ["jni-bindings"] diff --git a/fs-storage/examples/cli.rs b/fs-storage/examples/cli.rs index aaa4ad4c..40d373a6 100644 --- a/fs-storage/examples/cli.rs +++ b/fs-storage/examples/cli.rs @@ -130,7 +130,7 @@ fn file_write_command(args: &[String], path: &str) -> Result<()> { } fn folder_read_command(args: &[String], path: &str) -> Result<()> { - let keys = if args.len() > 3 { + let keys = if args.len() > 4 { args[4] .split(',') .map(|s| s.to_string()) diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs index 60d2dea8..68892a49 100644 --- a/fs-storage/src/folder_storage.rs +++ b/fs-storage/src/folder_storage.rs @@ -17,12 +17,12 @@ pub struct FolderStorage { label: String, /// Path to the underlying folder where data is persisted path: PathBuf, - /// `modified` can be used to track the last time a file was modified in memory. + /// `timestamps.0` can be used to track the last time a file was modified in memory. /// where the key is the path of the file inside the directory. - modified: BTreeMap, - /// `written_to_disk` can be used to track the last time a file written or read from disk. + /// + /// `timestamps.1` can be used to track the last time a file written or read from disk. /// where the key is the path of the file inside the directory. - written_to_disk: BTreeMap, + timestamps: BTreeMap, data: BTreeMap, /// Temporary store for deleted keys until storage is synced deleted_keys: BTreeSet, @@ -48,16 +48,13 @@ where V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, { /// Create a new folder storage with a diagnostic label and directory path - /// The storage will be initialized using the disk data, if the path exists - /// /// Note: if the folder storage already exists, the data will be read from the folder /// without overwriting it. pub fn new(label: String, path: &Path) -> Result { let mut storage = Self { label, path: PathBuf::from(path), - modified: BTreeMap::new(), - written_to_disk: BTreeMap::new(), + timestamps: BTreeMap::new(), data: BTreeMap::new(), deleted_keys: BTreeSet::new(), }; @@ -95,7 +92,7 @@ where .extension() .map_or(false, |ext| ext == "json") { - let key: K = self.extract_key_from_file_path(&path)?; + let key: K = extract_key_from_file_path(&self.label, &path)?; let file = File::open(&path)?; let value: V = serde_json::from_reader(file).map_err(|err| { @@ -107,32 +104,41 @@ where data.insert(key.clone(), value); - if let Ok(metadata) = fs::metadata(&path) { - if let Ok(modified) = metadata.modified() { - self.written_to_disk.insert(key.clone(), modified); - self.modified.insert(key, modified); - } - } + let modified = fs::metadata(path) + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to fetch metadata".to_owned(), + ) + })? + .modified() + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to fetch modified time from metadata" + .to_owned(), + ) + })?; + + self.timestamps.insert(key, (modified, modified)); } } self.data = data; Ok(()) } - /// Resolve differences between memory and disk data + + /// Resolves discrepancies between in-memory data and disk data by combining or + /// overwriting values based on which version is more recent, ensuring consistency. fn resolve_divergence(&mut self) -> Result<()> { let new_data = FolderStorage::new("new_data".into(), &self.path)?; for (key, new_value) in new_data.data.iter() { if let Some(existing_value) = self.data.get(key) { let existing_value_updated = self - .modified + .timestamps .get(key) - .and_then(|ram_stamp| { - self.written_to_disk - .get(key) - .map(|disk_stamp| ram_stamp > disk_stamp) - }) + .map(|timestamp| timestamp.0 > timestamp.1) .unwrap_or(false); // Use monoid to combine value for the given key @@ -156,35 +162,13 @@ where for key in self.deleted_keys.iter() { let file_path = self.path.join(format!("{}.json", key)); if file_path.exists() { - fs::remove_file(&file_path).expect("Failed to delete file"); + fs::remove_file(&file_path).unwrap_or_else(|_| { + panic!("Failed to delete file at {:?}", file_path) + }) } } Ok(()) } - - pub fn extract_key_from_file_path(&self, path: &Path) -> Result { - path.file_stem() - .ok_or_else(|| { - ArklibError::Storage( - self.label.clone(), - "Failed to extract file stem from filename".to_owned(), - ) - })? - .to_str() - .ok_or_else(|| { - ArklibError::Storage( - self.label.clone(), - "Failed to convert file stem to string".to_owned(), - ) - })? - .parse::() - .map_err(|_| { - ArklibError::Storage( - self.label.clone(), - "Failed to parse key from filename".to_owned(), - ) - }) - } } impl BaseStorage for FolderStorage @@ -201,7 +185,12 @@ where fn set(&mut self, key: K, value: V) { self.data.insert(key.clone(), value); self.deleted_keys.remove(&key); - self.modified.insert(key, SystemTime::now()); + self.timestamps + .entry(key) + .and_modify(|timestamp| { + timestamp.0 = SystemTime::now(); + }) + .or_insert((SystemTime::now(), SystemTime::now())); } /// Remove an entry from the internal mapping given a key @@ -218,9 +207,9 @@ where } } - /// Compare the timestamp of the storage files - /// with the timestamps of the in-memory storage and the last written - /// to time to determine if either of the two requires syncing. + /// Determines the synchronization status between RAM and disk. + /// Compares modification timestamps of files in RAM and on disk, + /// checking for newer versions in path. fn sync_status(&mut self) -> Result { let mut ram_newer = !self.deleted_keys.is_empty(); let mut disk_newer = false; @@ -228,9 +217,10 @@ where for key in self.data.keys() { let file_path = self.path.join(format!("{}.json", key)); let ram_timestamp = self - .modified + .timestamps .get(key) - .expect("Data entry key should have ram timestamp"); + .map(|(ram_time, _)| ram_time) + .expect("Data entry key should have a RAM timestamp"); if let Ok(metadata) = fs::metadata(&file_path) { if let Ok(disk_timestamp) = metadata.modified() { @@ -290,7 +280,7 @@ where .extension() .map_or(false, |ext| ext == "json") { - let key = self.extract_key_from_file_path(&path)?; + let key = extract_key_from_file_path(&self.label, &path)?; if !self.data.contains_key(&key) && !self.deleted_keys.contains(&key) { @@ -315,7 +305,12 @@ where /// Sync the in-memory storage with the storage on disk fn sync(&mut self) -> Result<()> { match self.sync_status()? { - SyncStatus::InSync => {} + SyncStatus::InSync => { + log::info!( + "Memory is synchronized with the storage, {}", + self.label + ); + } SyncStatus::MappingStale => { self.read_fs()?; } @@ -361,17 +356,15 @@ where file.set_modified(new_timestamp)?; file.sync_all()?; - self.written_to_disk - .insert(key.clone(), new_timestamp); - self.modified.insert(key.clone(), new_timestamp); + self.timestamps + .insert(key.clone(), (new_timestamp, new_timestamp)); } // Delete files for previously deleted keys self.deleted_keys.iter().for_each(|key| { log::debug!("Deleting key: {}", key); self.data.remove(key); - self.modified.remove(key); - self.written_to_disk.remove(key); + self.timestamps.remove(key); let file_path = self.path.join(format!("{}.json", key)); if file_path.exists() { fs::remove_file(&file_path).expect("Failed to delete file"); @@ -410,13 +403,44 @@ where } else { self.set(key.clone(), value.clone()) } - self.modified - .insert(key.clone(), SystemTime::now()); + self.timestamps + .entry(key.clone()) + .and_modify(|timestamp| { + timestamp.0 = SystemTime::now(); + }) + .or_insert((SystemTime::now(), SystemTime::now())); } Ok(()) } } +fn extract_key_from_file_path(label: &str, path: &Path) -> Result +where + K: std::str::FromStr, +{ + path.file_stem() + .ok_or_else(|| { + ArklibError::Storage( + label.to_owned(), + "Failed to extract file stem from filename".to_owned(), + ) + })? + .to_str() + .ok_or_else(|| { + ArklibError::Storage( + label.to_owned(), + "Failed to convert file stem to string".to_owned(), + ) + })? + .parse::() + .map_err(|_| { + ArklibError::Storage( + label.to_owned(), + "Failed to parse key from filename".to_owned(), + ) + }) +} + #[cfg(test)] mod tests { use crate::{