From d1405f38bc6d51fcb1ca14fb4c506f8b27a2e70f Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Tue, 1 Aug 2023 10:24:53 +0200
Subject: [PATCH 1/9] Add `KVStore` interface trait

We upstream the `KVStore` interface trait from LDK Node, which will
replace `KVStorePersister` in the coming commits.

Besides persistence, `KVStore` implementations will also offer to `list`
keys present in a given `namespace` and `read` the stored values.
---
 lightning/src/util/persist.rs | 92 ++++++++++++++++++++++++++++++++++-
 pending_changelog/kvstore.txt |  3 ++
 2 files changed, 93 insertions(+), 2 deletions(-)
 create mode 100644 pending_changelog/kvstore.txt

diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs
index 435ef30d331..35d19eea46b 100644
--- a/lightning/src/util/persist.rs
+++ b/lightning/src/util/persist.rs
@@ -4,13 +4,14 @@
 // You may not use this file except in accordance with one or both of these
 // licenses.
 
-//! This module contains a simple key-value store trait KVStorePersister that
+//! This module contains a simple key-value store trait [`KVStore`] that
 //! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`],
 //! and [`ChannelMonitor`] all in one place.
 
 use core::ops::Deref;
 use bitcoin::hashes::hex::ToHex;
 use crate::io;
+use crate::prelude::{Vec, String};
 use crate::routing::scoring::WriteableScore;
 
 use crate::chain;
@@ -22,7 +23,94 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
 use crate::ln::channelmanager::ChannelManager;
 use crate::routing::router::Router;
 use crate::routing::gossip::NetworkGraph;
-use super::{logger::Logger, ser::Writeable};
+use crate::util::logger::Logger;
+use crate::util::ser::Writeable;
+
+/// The alphabet of characters allowed for namespaces and keys.
+pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
+
+/// The maximum number of characters namespaces and keys may have.
+pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
+
+/// The namespace under which the [`ChannelManager`] will be persisted.
+pub const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE: &str = "";
+/// The sub-namespace under which the [`ChannelManager`] will be persisted.
+pub const CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE: &str = "";
+/// The key under which the [`ChannelManager`] will be persisted.
+pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
+
+/// The namespace under which [`ChannelMonitor`]s will be persisted.
+pub const CHANNEL_MONITOR_PERSISTENCE_NAMESPACE: &str = "monitors";
+/// The sub-namespace under which [`ChannelMonitor`]s will be persisted.
+pub const CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE: &str = "";
+
+/// The namespace under which the [`NetworkGraph`] will be persisted.
+pub const NETWORK_GRAPH_PERSISTENCE_NAMESPACE: &str = "";
+/// The sub-namespace under which the [`NetworkGraph`] will be persisted.
+pub const NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE: &str = "";
+/// The key under which the [`NetworkGraph`] will be persisted.
+pub const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph";
+
+/// The namespace under which the [`WriteableScore`] will be persisted.
+pub const SCORER_PERSISTENCE_NAMESPACE: &str = "";
+/// The sub-namespace under which the [`WriteableScore`] will be persisted.
+pub const SCORER_PERSISTENCE_SUB_NAMESPACE: &str = "";
+/// The key under which the [`WriteableScore`] will be persisted.
+pub const SCORER_PERSISTENCE_KEY: &str = "scorer";
+
+/// Provides an interface that allows storage and retrieval of persisted values that are associated
+/// with given keys.
+///
+/// In order to avoid collisions the key space is segmented based on the given `namespace`s and
+/// `sub_namespace`s. Implementations of this trait are free to handle them in different ways, as
+/// long as per-namespace key uniqueness is asserted.
+///
+/// Keys and namespaces are required to be valid ASCII strings in the range of
+/// [`KVSTORE_NAMESPACE_KEY_ALPHABET`] and no longer than [`KVSTORE_NAMESPACE_KEY_MAX_LEN`]. Empty
+/// namespaces and sub-namespaces (`""`) are assumed to be a valid, however, if `namespace` is
+/// empty, `sub_namespace` is required to be empty, too. This means that concerns should always be
+/// separated by namespace first, before sub-namespaces are used. While the number of namespaces
+/// will be relatively small and is determined at compile time, there may be many sub-namespaces
+/// per namespace. Note that per-namespace uniqueness needs to also hold for keys *and*
+/// namespaces/sub-namespaces in any given namespace/sub-namespace, i.e., conflicts between keys
+/// and equally named namespaces/sub-namespaces must be avoided.
+///
+/// **Note:** Users migrating custom persistence backends from the pre-v0.0.117 `KVStorePersister`
+/// interface can use a concatenation of `[{namespace}/[{sub_namespace}/]]{key}` to recover a `key` compatible with the
+/// data model previously assumed by `KVStorePersister::persist`.
+pub trait KVStore {
+	/// Returns the data stored for the given `namespace`, `sub_namespace`, and `key`.
+	///
+	/// Returns an [`ErrorKind::NotFound`] if the given `key` could not be found in the given
+	/// `namespace` and `sub_namespace`.
+	///
+	/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
+	fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result<Vec<u8>>;
+	/// Persists the given data under the given `key`.
+	///
+	/// Will create the given `namespace` and `sub_namespace` if not already present in the store.
+	fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()>;
+	/// Removes any data that had previously been persisted under the given `key`.
+	///
+	/// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
+	/// remove the given `key` at some point in time after the method returns, e.g., as part of an
+	/// eventual batch deletion of multiple keys. As a consequence, subsequent calls to
+	/// [`KVStore::list`] might include the removed key until the changes are actually persisted.
+	///
+	/// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent
+	/// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could
+	/// potentially get lost on crash after the method returns. Therefore, this flag should only be
+	/// set for `remove` operations that can be safely replayed at a later time.
+	///
+	/// Returns successfully if no data will be stored for the given `namespace`, `sub_namespace`, and
+	/// `key`, independently of whether it was present before its invokation or not.
+	fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> io::Result<()>;
+	/// Returns a list of keys that are stored under the given `sub_namespace` in `namespace`.
+	///
+	/// Returns the keys in arbitrary order, so users requiring a particular order need to sort the
+	/// returned keys. Returns an empty list if `namespace` or `sub_namespace` is unknown.
+	fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>>;
+}
 
 /// Trait for a key-value store for persisting some writeable object at some key
 /// Implementing `KVStorePersister` provides auto-implementations for [`Persister`]
diff --git a/pending_changelog/kvstore.txt b/pending_changelog/kvstore.txt
new file mode 100644
index 00000000000..d96fd69371b
--- /dev/null
+++ b/pending_changelog/kvstore.txt
@@ -0,0 +1,3 @@
+## Backwards Compatibility
+
+* Users migrating custom persistence backends from the pre-v0.0.117 `KVStorePersister` interface can use a concatenation of `[{namespace}/[{sub_namespace}/]]{key}` to recover a `key` compatible with the data model previously assumed by `KVStorePersister::persist`.

From 931ea2667a39c5adc4c6268120a5446eb8b07a04 Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Mon, 21 Aug 2023 13:13:56 +0200
Subject: [PATCH 2/9] Update `lightning-persister` crate

---
 lightning-persister/Cargo.toml | 4 ++--
 lightning-persister/src/lib.rs | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml
index 35bddc07746..a6775a18cff 100644
--- a/lightning-persister/Cargo.toml
+++ b/lightning-persister/Cargo.toml
@@ -3,9 +3,9 @@ name = "lightning-persister"
 version = "0.0.116"
 authors = ["Valentine Wallace", "Matt Corallo"]
 license = "MIT OR Apache-2.0"
-repository = "https://github.com/lightningdevkit/rust-lightning/"
+repository = "https://github.com/lightningdevkit/rust-lightning"
 description = """
-Utilities to manage Rust-Lightning channel data persistence and retrieval.
+Utilities for LDK data persistence and retrieval.
 """
 edition = "2018"
 
diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs
index b34fe895b47..dc205feba4e 100644
--- a/lightning-persister/src/lib.rs
+++ b/lightning-persister/src/lib.rs
@@ -1,6 +1,6 @@
-//! Utilities that handle persisting Rust-Lightning data to disk via standard filesystem APIs.
-
-// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
+//! Provides utilities for LDK data persistence and retrieval.
+//
+// TODO: Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
 #![deny(broken_intra_doc_links)]
 #![deny(private_intra_doc_links)]
 

From c18f43fb07e7af7c0b8d73f92eef259f3b06f7e0 Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Fri, 4 Aug 2023 10:09:55 +0200
Subject: [PATCH 3/9] Add `test_utils`

We add a utility function needed by upcoming `KVStore` implementation
tests.
---
 lightning-persister/src/lib.rs        |  3 ++
 lightning-persister/src/test_utils.rs | 50 +++++++++++++++++++++++++++
 2 files changed, 53 insertions(+)
 create mode 100644 lightning-persister/src/test_utils.rs

diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs
index dc205feba4e..0a667e4216f 100644
--- a/lightning-persister/src/lib.rs
+++ b/lightning-persister/src/lib.rs
@@ -10,6 +10,9 @@
 
 #[cfg(ldk_bench)] extern crate criterion;
 
+#[cfg(test)]
+mod test_utils;
+
 mod util;
 
 extern crate lightning;
diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs
new file mode 100644
index 00000000000..357a8b2ee8c
--- /dev/null
+++ b/lightning-persister/src/test_utils.rs
@@ -0,0 +1,50 @@
+use lightning::util::persist::{KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN};
+
+use std::panic::RefUnwindSafe;
+
+pub(crate) fn do_read_write_remove_list_persist<K: KVStore + RefUnwindSafe>(kv_store: &K) {
+	let data = [42u8; 32];
+
+	let namespace = "testspace";
+	let sub_namespace = "testsubspace";
+	let key = "testkey";
+
+	// Test the basic KVStore operations.
+	kv_store.write(namespace, sub_namespace, key, &data).unwrap();
+
+	// Test empty namespace/sub_namespace is allowed, but not empty namespace and non-empty
+	// sub-namespace, and not empty key.
+	kv_store.write("", "", key, &data).unwrap();
+	let res = std::panic::catch_unwind(|| kv_store.write("", sub_namespace, key, &data));
+	assert!(res.is_err());
+	let res = std::panic::catch_unwind(|| kv_store.write(namespace, sub_namespace, "", &data));
+	assert!(res.is_err());
+
+	let listed_keys = kv_store.list(namespace, sub_namespace).unwrap();
+	assert_eq!(listed_keys.len(), 1);
+	assert_eq!(listed_keys[0], key);
+
+	let read_data = kv_store.read(namespace, sub_namespace, key).unwrap();
+	assert_eq!(data, &*read_data);
+
+	kv_store.remove(namespace, sub_namespace, key, false).unwrap();
+
+	let listed_keys = kv_store.list(namespace, sub_namespace).unwrap();
+	assert_eq!(listed_keys.len(), 0);
+
+	// Ensure we have no issue operating with namespace/sub_namespace/key being KVSTORE_NAMESPACE_KEY_MAX_LEN
+	let max_chars: String = std::iter::repeat('A').take(KVSTORE_NAMESPACE_KEY_MAX_LEN).collect();
+	kv_store.write(&max_chars, &max_chars, &max_chars, &data).unwrap();
+
+	let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
+	assert_eq!(listed_keys.len(), 1);
+	assert_eq!(listed_keys[0], max_chars);
+
+	let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
+	assert_eq!(data, &*read_data);
+
+	kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap();
+
+	let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
+	assert_eq!(listed_keys.len(), 0);
+}

From f22d1b63902404a15ca808c977f8364cbf90d277 Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Tue, 1 Aug 2023 10:46:51 +0200
Subject: [PATCH 4/9] Add `FilesystemStore`

We upstream the `FilesystemStore` implementation, which is backwards
compatible with `lightning-persister::FilesystemPersister`.
---
 lightning-persister/Cargo.toml      |   1 +
 lightning-persister/src/fs_store.rs | 379 ++++++++++++++++++++++++++++
 lightning-persister/src/lib.rs      |   4 +
 lightning-persister/src/utils.rs    |  59 +++++
 4 files changed, 443 insertions(+)
 create mode 100644 lightning-persister/src/fs_store.rs
 create mode 100644 lightning-persister/src/utils.rs

diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml
index a6775a18cff..271f3b882b3 100644
--- a/lightning-persister/Cargo.toml
+++ b/lightning-persister/Cargo.toml
@@ -20,6 +20,7 @@ libc = "0.2"
 
 [target.'cfg(windows)'.dependencies]
 winapi = { version = "0.3", features = ["winbase"] }
+windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
 
 [target.'cfg(ldk_bench)'.dependencies]
 criterion = { version = "0.4", optional = true, default-features = false }
diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs
new file mode 100644
index 00000000000..f74806d7d8e
--- /dev/null
+++ b/lightning-persister/src/fs_store.rs
@@ -0,0 +1,379 @@
+//! Objects related to [`FilesystemStore`] live here.
+use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
+
+use lightning::util::persist::KVStore;
+use lightning::util::string::PrintableString;
+
+use std::collections::HashMap;
+use std::fs;
+use std::io::{Read, Write};
+use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex, RwLock};
+
+#[cfg(target_os = "windows")]
+use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
+
+#[cfg(target_os = "windows")]
+macro_rules! call {
+	($e: expr) => {
+		if $e != 0 {
+			Ok(())
+		} else {
+			Err(std::io::Error::last_os_error())
+		}
+	};
+}
+
+#[cfg(target_os = "windows")]
+fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
+	path.as_ref().encode_wide().chain(Some(0)).collect()
+}
+
+// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
+const GC_LOCK_INTERVAL: usize = 25;
+
+/// A [`KVStore`] implementation that writes to and reads from the file system.
+pub struct FilesystemStore {
+	data_dir: PathBuf,
+	tmp_file_counter: AtomicUsize,
+	gc_counter: AtomicUsize,
+	locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
+}
+
+impl FilesystemStore {
+	/// Constructs a new [`FilesystemStore`].
+	pub fn new(data_dir: PathBuf) -> Self {
+		let locks = Mutex::new(HashMap::new());
+		let tmp_file_counter = AtomicUsize::new(0);
+		let gc_counter = AtomicUsize::new(1);
+		Self { data_dir, tmp_file_counter, gc_counter, locks }
+	}
+
+	/// Returns the data directory.
+	pub fn get_data_dir(&self) -> PathBuf {
+		self.data_dir.clone()
+	}
+
+	fn garbage_collect_locks(&self) {
+		let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
+
+		if gc_counter % GC_LOCK_INTERVAL == 0 {
+			// Take outer lock for the cleanup.
+			let mut outer_lock = self.locks.lock().unwrap();
+
+			// Garbage collect all lock entries that are not referenced anymore.
+			outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
+		}
+	}
+
+	fn get_dest_dir_path(&self, namespace: &str, sub_namespace: &str) -> std::io::Result<PathBuf> {
+		let mut dest_dir_path = {
+			#[cfg(target_os = "windows")]
+			{
+				let data_dir = self.data_dir.clone();
+				fs::create_dir_all(data_dir.clone())?;
+				fs::canonicalize(data_dir)?
+			}
+			#[cfg(not(target_os = "windows"))]
+			{
+				self.data_dir.clone()
+			}
+		};
+
+		dest_dir_path.push(namespace);
+		if !sub_namespace.is_empty() {
+			dest_dir_path.push(sub_namespace);
+		}
+
+		Ok(dest_dir_path)
+	}
+}
+
+impl KVStore for FilesystemStore {
+	fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
+		check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?;
+
+		let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
+		dest_file_path.push(key);
+
+		let mut buf = Vec::new();
+		{
+			let inner_lock_ref = {
+				let mut outer_lock = self.locks.lock().unwrap();
+				Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
+			};
+			let _guard = inner_lock_ref.read().unwrap();
+
+			let mut f = fs::File::open(dest_file_path)?;
+			f.read_to_end(&mut buf)?;
+		}
+
+		self.garbage_collect_locks();
+
+		Ok(buf)
+	}
+
+	fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
+		check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?;
+
+		let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
+		dest_file_path.push(key);
+
+		let parent_directory = dest_file_path
+			.parent()
+			.ok_or_else(|| {
+				let msg =
+					format!("Could not retrieve parent directory of {}.", dest_file_path.display());
+				std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
+			})?;
+		fs::create_dir_all(&parent_directory)?;
+
+		// Do a crazy dance with lots of fsync()s to be overly cautious here...
+		// We never want to end up in a state where we've lost the old data, or end up using the
+		// old data on power loss after we've returned.
+		// The way to atomically write a file on Unix platforms is:
+		// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
+		let mut tmp_file_path = dest_file_path.clone();
+		let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
+		tmp_file_path.set_extension(tmp_file_ext);
+
+		{
+			let mut tmp_file = fs::File::create(&tmp_file_path)?;
+			tmp_file.write_all(&buf)?;
+			tmp_file.sync_all()?;
+		}
+
+		let res = {
+			let inner_lock_ref = {
+				let mut outer_lock = self.locks.lock().unwrap();
+				Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
+			};
+			let _guard = inner_lock_ref.write().unwrap();
+
+			#[cfg(not(target_os = "windows"))]
+			{
+				fs::rename(&tmp_file_path, &dest_file_path)?;
+				let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
+				dir_file.sync_all()?;
+				Ok(())
+			}
+
+			#[cfg(target_os = "windows")]
+			{
+				let res = if dest_file_path.exists() {
+					call!(unsafe {
+						windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
+							path_to_windows_str(dest_file_path.clone()).as_ptr(),
+							path_to_windows_str(tmp_file_path).as_ptr(),
+							std::ptr::null(),
+							windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
+							std::ptr::null_mut() as *const core::ffi::c_void,
+							std::ptr::null_mut() as *const core::ffi::c_void,
+							)
+					})
+				} else {
+					call!(unsafe {
+						windows_sys::Win32::Storage::FileSystem::MoveFileExW(
+							path_to_windows_str(tmp_file_path).as_ptr(),
+							path_to_windows_str(dest_file_path.clone()).as_ptr(),
+							windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
+							| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
+							)
+					})
+				};
+
+				match res {
+					Ok(()) => {
+						// We fsync the dest file in hopes this will also flush the metadata to disk.
+						let dest_file = fs::OpenOptions::new().read(true).write(true)
+							.open(&dest_file_path)?;
+						dest_file.sync_all()?;
+						Ok(())
+					}
+					Err(e) => Err(e),
+				}
+			}
+		};
+
+		self.garbage_collect_locks();
+
+		res
+	}
+
+	fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> {
+		check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?;
+
+		let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
+		dest_file_path.push(key);
+
+		if !dest_file_path.is_file() {
+			return Ok(());
+		}
+
+		{
+			let inner_lock_ref = {
+				let mut outer_lock = self.locks.lock().unwrap();
+				Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
+			};
+			let _guard = inner_lock_ref.write().unwrap();
+
+			if lazy {
+				// If we're lazy we just call remove and be done with it.
+				fs::remove_file(&dest_file_path)?;
+			} else {
+				// If we're not lazy we try our best to persist the updated metadata to ensure
+				// atomicity of this call.
+				#[cfg(not(target_os = "windows"))]
+				{
+					fs::remove_file(&dest_file_path)?;
+
+					let parent_directory = dest_file_path.parent().ok_or_else(|| {
+						let msg =
+							format!("Could not retrieve parent directory of {}.", dest_file_path.display());
+						std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
+					})?;
+					let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
+					// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
+					// to the inode might get cached (and hence possibly lost on crash), depending on
+					// the target platform and file system.
+					//
+					// In order to assert we permanently removed the file in question we therefore
+					// call `fsync` on the parent directory on platforms that support it.
+					dir_file.sync_all()?;
+				}
+
+				#[cfg(target_os = "windows")]
+				{
+					// Since Windows `DeleteFile` API is not persisted until the last open file handle
+					// is dropped, and there seemingly is no reliable way to flush the directory
+					// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
+					// file to be deleted to a temporary trash file and remove the latter file
+					// afterwards.
+					//
+					// This should be marginally better, as, according to the documentation,
+					// `MoveFileExW` APIs should offer stronger persistence guarantees,
+					// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
+					// However, all this is partially based on assumptions and local experiments, as
+					// Windows API is horribly underdocumented.
+					let mut trash_file_path = dest_file_path.clone();
+					let trash_file_ext = format!("{}.trash",
+						self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
+					trash_file_path.set_extension(trash_file_ext);
+
+					call!(unsafe {
+						windows_sys::Win32::Storage::FileSystem::MoveFileExW(
+							path_to_windows_str(dest_file_path).as_ptr(),
+							path_to_windows_str(trash_file_path.clone()).as_ptr(),
+							windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
+							| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
+							)
+					})?;
+
+					{
+						// We fsync the trash file in hopes this will also flush the original's file
+						// metadata to disk.
+						let trash_file = fs::OpenOptions::new().read(true).write(true)
+							.open(&trash_file_path.clone())?;
+						trash_file.sync_all()?;
+					}
+
+					// We're fine if this remove would fail as the trash file will be cleaned up in
+					// list eventually.
+					fs::remove_file(trash_file_path).ok();
+				}
+			}
+		}
+
+		self.garbage_collect_locks();
+
+		Ok(())
+	}
+
+	fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result<Vec<String>> {
+		check_namespace_key_validity(namespace, sub_namespace, None, "list")?;
+
+		let prefixed_dest = self.get_dest_dir_path(namespace, sub_namespace)?;
+		let mut keys = Vec::new();
+
+		if !Path::new(&prefixed_dest).exists() {
+			return Ok(Vec::new());
+		}
+
+		for entry in fs::read_dir(&prefixed_dest)? {
+			let entry = entry?;
+			let p = entry.path();
+
+			if let Some(ext) = p.extension() {
+				#[cfg(target_os = "windows")]
+				{
+					// Clean up any trash files lying around.
+					if ext == "trash" {
+						fs::remove_file(p).ok();
+						continue;
+					}
+				}
+				if ext == "tmp" {
+					continue;
+				}
+			}
+
+			let metadata = p.metadata()?;
+
+			// We allow the presence of directories in the empty namespace and just skip them.
+			if metadata.is_dir() {
+				continue;
+			}
+
+			// If we otherwise don't find a file at the given path something went wrong.
+			if !metadata.is_file() {
+				debug_assert!(false, "Failed to list keys of {}/{}: file couldn't be accessed.",
+					PrintableString(namespace), PrintableString(sub_namespace));
+				let msg = format!("Failed to list keys of {}/{}: file couldn't be accessed.",
+					PrintableString(namespace), PrintableString(sub_namespace));
+				return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+			}
+
+			match p.strip_prefix(&prefixed_dest) {
+				Ok(stripped_path) => {
+					if let Some(relative_path) = stripped_path.to_str() {
+						if is_valid_kvstore_str(relative_path) {
+							keys.push(relative_path.to_string())
+						}
+					} else {
+						debug_assert!(false, "Failed to list keys of {}/{}: file path is not valid UTF-8",
+							PrintableString(namespace), PrintableString(sub_namespace));
+						let msg = format!("Failed to list keys of {}/{}: file path is not valid UTF-8",
+							PrintableString(namespace), PrintableString(sub_namespace));
+						return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+					}
+				}
+				Err(e) => {
+					debug_assert!(false, "Failed to list keys of {}/{}: {}",
+						PrintableString(namespace), PrintableString(sub_namespace), e);
+					let msg = format!("Failed to list keys of {}/{}: {}",
+						PrintableString(namespace), PrintableString(sub_namespace), e);
+					return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+				}
+			}
+		}
+
+		self.garbage_collect_locks();
+
+		Ok(keys)
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use crate::test_utils::do_read_write_remove_list_persist;
+
+	#[test]
+	fn read_write_remove_list_persist() {
+		let mut temp_path = std::env::temp_dir();
+		temp_path.push("test_read_write_remove_list_persist");
+		let fs_store = FilesystemStore::new(temp_path);
+		do_read_write_remove_list_persist(&fs_store);
+	}
+}
diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs
index 0a667e4216f..932e4f41ad3 100644
--- a/lightning-persister/src/lib.rs
+++ b/lightning-persister/src/lib.rs
@@ -10,6 +10,10 @@
 
 #[cfg(ldk_bench)] extern crate criterion;
 
+pub mod fs_store;
+
+mod utils;
+
 #[cfg(test)]
 mod test_utils;
 
diff --git a/lightning-persister/src/utils.rs b/lightning-persister/src/utils.rs
new file mode 100644
index 00000000000..54ec230de2d
--- /dev/null
+++ b/lightning-persister/src/utils.rs
@@ -0,0 +1,59 @@
+use lightning::util::persist::{KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN};
+use lightning::util::string::PrintableString;
+
+
+pub(crate) fn is_valid_kvstore_str(key: &str) -> bool {
+	key.len() <= KVSTORE_NAMESPACE_KEY_MAX_LEN && key.chars().all(|c| KVSTORE_NAMESPACE_KEY_ALPHABET.contains(c))
+}
+
+pub(crate) fn check_namespace_key_validity(namespace: &str, sub_namespace: &str, key: Option<&str>, operation: &str) -> Result<(), std::io::Error> {
+	if let Some(key) = key {
+		if key.is_empty() {
+			debug_assert!(false, "Failed to {} {}/{}/{}: key may not be empty.", operation,
+				PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key));
+			let msg = format!("Failed to {} {}/{}/{}: key may not be empty.", operation,
+				PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key));
+			return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+		}
+
+		if namespace.is_empty() && !sub_namespace.is_empty() {
+			debug_assert!(false,
+				"Failed to {} {}/{}/{}: namespace may not be empty if a non-empty sub-namespace is given.",
+				operation,
+				PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key));
+			let msg = format!(
+				"Failed to {} {}/{}/{}: namespace may not be empty if a non-empty sub-namespace is given.", operation,
+				PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key));
+			return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+		}
+
+		if !is_valid_kvstore_str(namespace) || !is_valid_kvstore_str(sub_namespace) || !is_valid_kvstore_str(key) {
+			debug_assert!(false, "Failed to {} {}/{}/{}: namespace, sub-namespace, and key must be valid.",
+				operation,
+				PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key));
+			let msg = format!("Failed to {} {}/{}/{}: namespace, sub-namespace, and key must be valid.",
+				operation,
+				PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key));
+			return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+		}
+	} else {
+		if namespace.is_empty() && !sub_namespace.is_empty() {
+			debug_assert!(false,
+				"Failed to {} {}/{}: namespace may not be empty if a non-empty sub-namespace is given.",
+				operation, PrintableString(namespace), PrintableString(sub_namespace));
+			let msg = format!(
+				"Failed to {} {}/{}: namespace may not be empty if a non-empty sub-namespace is given.",
+				operation, PrintableString(namespace), PrintableString(sub_namespace));
+			return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+		}
+		if !is_valid_kvstore_str(namespace) || !is_valid_kvstore_str(sub_namespace) {
+			debug_assert!(false, "Failed to {} {}/{}: namespace and sub-namespace must be valid.",
+				operation, PrintableString(namespace), PrintableString(sub_namespace));
+			let msg = format!("Failed to {} {}/{}: namespace and sub-namespace must be valid.",
+				operation, PrintableString(namespace), PrintableString(sub_namespace));
+			return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
+		}
+	}
+
+	Ok(())
+}

From 4305ee4106b6123a9cf632cb08a879a83de3a513 Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Fri, 4 Aug 2023 16:20:50 +0200
Subject: [PATCH 5/9] Add `read_channel_monitors` utility

This replaces the `FilesystemPersister::read_channelmonitors` method, as
we can now implement a single utility for all `KVStore`s.
---
 lightning/src/util/persist.rs | 55 +++++++++++++++++++++++++++++++++--
 1 file changed, 53 insertions(+), 2 deletions(-)

diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs
index 35d19eea46b..5b122b7a570 100644
--- a/lightning/src/util/persist.rs
+++ b/lightning/src/util/persist.rs
@@ -9,7 +9,9 @@
 //! and [`ChannelMonitor`] all in one place.
 
 use core::ops::Deref;
-use bitcoin::hashes::hex::ToHex;
+use bitcoin::hashes::hex::{FromHex, ToHex};
+use bitcoin::{BlockHash, Txid};
+
 use crate::io;
 use crate::prelude::{Vec, String};
 use crate::routing::scoring::WriteableScore;
@@ -24,7 +26,7 @@ use crate::ln::channelmanager::ChannelManager;
 use crate::routing::router::Router;
 use crate::routing::gossip::NetworkGraph;
 use crate::util::logger::Logger;
-use crate::util::ser::Writeable;
+use crate::util::ser::{ReadableArgs, Writeable};
 
 /// The alphabet of characters allowed for namespaces and keys.
 pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
@@ -190,3 +192,52 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStorePersister> Persist<Ch
 		}
 	}
 }
+
+/// Read previously persisted [`ChannelMonitor`]s from the store.
+pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
+	kv_store: K, entropy_source: ES, signer_provider: SP,
+) -> io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
+where
+	K::Target: KVStore,
+	ES::Target: EntropySource + Sized,
+	SP::Target: SignerProvider + Sized,
+{
+	let mut res = Vec::new();
+
+	for stored_key in kv_store.list(
+		CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE)?
+	{
+		let txid = Txid::from_hex(stored_key.split_at(64).0).map_err(|_| {
+			io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
+		})?;
+
+		let index: u16 = stored_key.split_at(65).1.parse().map_err(|_| {
+			io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
+		})?;
+
+		match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(
+			&mut io::Cursor::new(
+				kv_store.read(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE, &stored_key)?),
+			(&*entropy_source, &*signer_provider),
+		) {
+			Ok((block_hash, channel_monitor)) => {
+				if channel_monitor.get_funding_txo().0.txid != txid
+					|| channel_monitor.get_funding_txo().0.index != index
+				{
+					return Err(io::Error::new(
+						io::ErrorKind::InvalidData,
+						"ChannelMonitor was stored under the wrong key",
+					));
+				}
+				res.push((block_hash, channel_monitor));
+			}
+			Err(_) => {
+				return Err(io::Error::new(
+					io::ErrorKind::InvalidData,
+					"Failed to deserialize ChannelMonitor"
+				))
+			}
+		}
+	}
+	Ok(res)
+}

From cc1b505b305c5339496d9aaca28c73b083ba602f Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Tue, 1 Aug 2023 13:37:46 +0200
Subject: [PATCH 6/9] Migrate to `KVStore`/`FilesystemStore`

Firstly, we switch our BP over to use `FilesystemStore`, which also gives us test
coverage and ensures the compatibility.

Then, we remove the superseded `KVStorePersister` trait and
the `FilesystemPersister` code.
---
 bench/benches/bench.rs                    |   1 -
 lightning-background-processor/src/lib.rs | 103 ++++---
 lightning-persister/Cargo.toml            |   2 -
 lightning-persister/src/lib.rs            | 338 ----------------------
 lightning-persister/src/util.rs           | 188 ------------
 lightning/src/util/persist.rs             |  51 ++--
 6 files changed, 94 insertions(+), 589 deletions(-)
 delete mode 100644 lightning-persister/src/util.rs

diff --git a/bench/benches/bench.rs b/bench/benches/bench.rs
index 54799f44c95..3fc3abe687b 100644
--- a/bench/benches/bench.rs
+++ b/bench/benches/bench.rs
@@ -15,7 +15,6 @@ criterion_group!(benches,
 	lightning::routing::router::benches::generate_large_mpp_routes_with_probabilistic_scorer,
 	lightning::sign::benches::bench_get_secure_random_bytes,
 	lightning::ln::channelmanager::bench::bench_sends,
-	lightning_persister::bench::bench_sends,
 	lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file,
 	lightning::routing::gossip::benches::read_network_graph,
 	lightning::routing::gossip::benches::write_network_graph);
diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs
index 8648920ec2c..353ed6738d6 100644
--- a/lightning-background-processor/src/lib.rs
+++ b/lightning-background-processor/src/lib.rs
@@ -500,9 +500,16 @@ use core::task;
 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
 /// could setup `process_events_async` like this:
 /// ```
-/// # struct MyPersister {}
-/// # impl lightning::util::persist::KVStorePersister for MyPersister {
-/// #     fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
+/// # use lightning::io;
+/// # use std::sync::{Arc, Mutex};
+/// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use lightning_background_processor::{process_events_async, GossipSync};
+/// # struct MyStore {}
+/// # impl lightning::util::persist::KVStore for MyStore {
+/// #     fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
+/// #     fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
+/// #     fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
+/// #     fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
 /// # }
 /// # struct MyEventHandler {}
 /// # impl MyEventHandler {
@@ -514,23 +521,20 @@ use core::task;
 /// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
 /// #     fn disconnect_socket(&mut self) {}
 /// # }
-/// # use std::sync::{Arc, Mutex};
-/// # use std::sync::atomic::{AtomicBool, Ordering};
-/// # use lightning_background_processor::{process_events_async, GossipSync};
 /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
 /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
 /// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
 /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
 /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
 /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
-/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
+/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
 /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
 /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
 /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
 /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
 /// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
 ///
-/// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
+/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
 ///	let background_persister = Arc::clone(&my_persister);
 ///	let background_event_handler = Arc::clone(&my_event_handler);
 ///	let background_chain_mon = Arc::clone(&my_chain_monitor);
@@ -866,8 +870,8 @@ mod tests {
 	use lightning::util::config::UserConfig;
 	use lightning::util::ser::Writeable;
 	use lightning::util::test_utils;
-	use lightning::util::persist::KVStorePersister;
-	use lightning_persister::FilesystemPersister;
+	use lightning::util::persist::{KVStore, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_SUB_NAMESPACE, SCORER_PERSISTENCE_KEY};
+	use lightning_persister::fs_store::FilesystemStore;
 	use std::collections::VecDeque;
 	use std::{fs, env};
 	use std::path::PathBuf;
@@ -906,7 +910,7 @@ mod tests {
 			>,
 			Arc<test_utils::TestLogger>>;
 
-	type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
+	type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
 
 	type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
 	type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
@@ -917,7 +921,7 @@ mod tests {
 		rapid_gossip_sync: RGS,
 		peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
 		chain_monitor: Arc<ChainMonitor>,
-		persister: Arc<FilesystemPersister>,
+		kv_store: Arc<FilesystemStore>,
 		tx_broadcaster: Arc<test_utils::TestBroadcaster>,
 		network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
 		logger: Arc<test_utils::TestLogger>,
@@ -941,9 +945,9 @@ mod tests {
 
 	impl Drop for Node {
 		fn drop(&mut self) {
-			let data_dir = self.persister.get_data_dir();
+			let data_dir = self.kv_store.get_data_dir();
 			match fs::remove_dir_all(data_dir.clone()) {
-				Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
+				Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e),
 				_ => {}
 			}
 		}
@@ -954,13 +958,13 @@ mod tests {
 		graph_persistence_notifier: Option<SyncSender<()>>,
 		manager_error: Option<(std::io::ErrorKind, &'static str)>,
 		scorer_error: Option<(std::io::ErrorKind, &'static str)>,
-		filesystem_persister: FilesystemPersister,
+		kv_store: FilesystemStore,
 	}
 
 	impl Persister {
-		fn new(data_dir: String) -> Self {
-			let filesystem_persister = FilesystemPersister::new(data_dir);
-			Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
+		fn new(data_dir: PathBuf) -> Self {
+			let kv_store = FilesystemStore::new(data_dir);
+			Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
 		}
 
 		fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
@@ -980,15 +984,25 @@ mod tests {
 		}
 	}
 
-	impl KVStorePersister for Persister {
-		fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
-			if key == "manager" {
+	impl KVStore for Persister {
+		fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
+			self.kv_store.read(namespace, sub_namespace, key)
+		}
+
+		fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
+			if namespace == CHANNEL_MANAGER_PERSISTENCE_NAMESPACE &&
+				sub_namespace == CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE &&
+				key == CHANNEL_MANAGER_PERSISTENCE_KEY
+			{
 				if let Some((error, message)) = self.manager_error {
 					return Err(std::io::Error::new(error, message))
 				}
 			}
 
-			if key == "network_graph" {
+			if namespace == NETWORK_GRAPH_PERSISTENCE_NAMESPACE &&
+				sub_namespace == NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE &&
+				key == NETWORK_GRAPH_PERSISTENCE_KEY
+			{
 				if let Some(sender) = &self.graph_persistence_notifier {
 					match sender.send(()) {
 						Ok(()) => {},
@@ -1001,13 +1015,24 @@ mod tests {
 				}
 			}
 
-			if key == "scorer" {
+			if namespace == SCORER_PERSISTENCE_NAMESPACE &&
+				sub_namespace == SCORER_PERSISTENCE_SUB_NAMESPACE &&
+				key == SCORER_PERSISTENCE_KEY
+			{
 				if let Some((error, message)) = self.scorer_error {
 					return Err(std::io::Error::new(error, message))
 				}
 			}
 
-			self.filesystem_persister.persist(key, object)
+			self.kv_store.write(namespace, sub_namespace, key, buf)
+		}
+
+		fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
+			self.kv_store.remove(namespace, sub_namespace, key, lazy)
+		}
+
+		fn list(&self, namespace: &str, sub_namespace: &str) -> lightning::io::Result<Vec<String>> {
+			self.kv_store.list(namespace, sub_namespace)
 		}
 	}
 
@@ -1157,10 +1182,10 @@ mod tests {
 			let seed = [i as u8; 32];
 			let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ()));
 			let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
-			let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", &persist_dir, i)));
+			let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
 			let now = Duration::from_secs(genesis_block.header.time as u64);
 			let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
-			let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
+			let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
 			let best_block = BestBlock::from_network(network);
 			let params = ChainParameters { network, best_block };
 			let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
@@ -1172,7 +1197,7 @@ mod tests {
 				onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
 			};
 			let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
-			let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
+			let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer };
 			nodes.push(node);
 		}
 
@@ -1267,7 +1292,7 @@ mod tests {
 		let tx = open_channel!(nodes[0], nodes[1], 100000);
 
 		// Initiate the background processors to watch each node.
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir));
 		let event_handler = |_: _| {};
 		let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1332,7 +1357,7 @@ mod tests {
 		// `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
 		// `PeerManager::timer_tick_occurred` every `PING_TIMER`.
 		let (_, nodes) = create_nodes(1, "test_timer_tick_called");
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir));
 		let event_handler = |_: _| {};
 		let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1359,7 +1384,7 @@ mod tests {
 		let (_, nodes) = create_nodes(2, "test_persist_error");
 		open_channel!(nodes[0], nodes[1], 100000);
 
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
 		let event_handler = |_: _| {};
 		let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1379,7 +1404,7 @@ mod tests {
 		let (_, nodes) = create_nodes(2, "test_persist_error_sync");
 		open_channel!(nodes[0], nodes[1], 100000);
 
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
 
 		let bp_future = super::process_events_async(
@@ -1405,7 +1430,7 @@ mod tests {
 	fn test_network_graph_persist_error() {
 		// Test that if we encounter an error during network graph persistence, an error gets returned.
 		let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
 		let event_handler = |_: _| {};
 		let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1423,7 +1448,7 @@ mod tests {
 	fn test_scorer_persist_error() {
 		// Test that if we encounter an error during scorer persistence, an error gets returned.
 		let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
 		let event_handler = |_: _| {};
 		let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1441,7 +1466,7 @@ mod tests {
 	fn test_background_event_handling() {
 		let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
 		let channel_value = 100000;
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir.clone()));
 
 		// Set up a background event handler for FundingGenerationReady events.
@@ -1514,7 +1539,7 @@ mod tests {
 	#[test]
 	fn test_scorer_persistence() {
 		let (_, nodes) = create_nodes(2, "test_scorer_persistence");
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir));
 		let event_handler = |_: _| {};
 		let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1586,7 +1611,7 @@ mod tests {
 		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
 
 		let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
 
 		let event_handler = |_: _| {};
@@ -1605,7 +1630,7 @@ mod tests {
 		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
 
 		let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
 
 		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
@@ -1745,7 +1770,7 @@ mod tests {
 		};
 
 		let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir));
 		let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
@@ -1778,7 +1803,7 @@ mod tests {
 		};
 
 		let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
-		let data_dir = nodes[0].persister.get_data_dir();
+		let data_dir = nodes[0].kv_store.get_data_dir();
 		let persister = Arc::new(Persister::new(data_dir));
 
 		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml
index 271f3b882b3..7ba4223227d 100644
--- a/lightning-persister/Cargo.toml
+++ b/lightning-persister/Cargo.toml
@@ -16,10 +16,8 @@ rustdoc-args = ["--cfg", "docsrs"]
 [dependencies]
 bitcoin = "0.29.0"
 lightning = { version = "0.0.116", path = "../lightning" }
-libc = "0.2"
 
 [target.'cfg(windows)'.dependencies]
-winapi = { version = "0.3", features = ["winbase"] }
 windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
 
 [target.'cfg(ldk_bench)'.dependencies]
diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs
index 932e4f41ad3..ae258e137d7 100644
--- a/lightning-persister/src/lib.rs
+++ b/lightning-persister/src/lib.rs
@@ -16,341 +16,3 @@ mod utils;
 
 #[cfg(test)]
 mod test_utils;
-
-mod util;
-
-extern crate lightning;
-extern crate bitcoin;
-extern crate libc;
-
-use bitcoin::hash_types::{BlockHash, Txid};
-use bitcoin::hashes::hex::FromHex;
-use lightning::chain::channelmonitor::ChannelMonitor;
-use lightning::sign::{EntropySource, SignerProvider};
-use lightning::util::ser::{ReadableArgs, Writeable};
-use lightning::util::persist::KVStorePersister;
-use std::fs;
-use std::io::Cursor;
-use std::ops::Deref;
-use std::path::{Path, PathBuf};
-
-/// FilesystemPersister persists channel data on disk, where each channel's
-/// data is stored in a file named after its funding outpoint.
-///
-/// Warning: this module does the best it can with calls to persist data, but it
-/// can only guarantee that the data is passed to the drive. It is up to the
-/// drive manufacturers to do the actual persistence properly, which they often
-/// don't (especially on consumer-grade hardware). Therefore, it is up to the
-/// user to validate their entire storage stack, to ensure the writes are
-/// persistent.
-/// Corollary: especially when dealing with larger amounts of money, it is best
-/// practice to have multiple channel data backups and not rely only on one
-/// FilesystemPersister.
-pub struct FilesystemPersister {
-	path_to_channel_data: String,
-}
-
-impl FilesystemPersister {
-	/// Initialize a new FilesystemPersister and set the path to the individual channels'
-	/// files.
-	pub fn new(path_to_channel_data: String) -> Self {
-		Self {
-			path_to_channel_data,
-		}
-	}
-
-	/// Get the directory which was provided when this persister was initialized.
-	pub fn get_data_dir(&self) -> String {
-		self.path_to_channel_data.clone()
-	}
-
-	/// Read `ChannelMonitor`s from disk.
-	pub fn read_channelmonitors<ES: Deref, SP: Deref> (
-		&self, entropy_source: ES, signer_provider: SP
-	) -> std::io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
-		where
-			ES::Target: EntropySource + Sized,
-			SP::Target: SignerProvider + Sized
-	{
-		let mut path = PathBuf::from(&self.path_to_channel_data);
-		path.push("monitors");
-		if !Path::new(&path).exists() {
-			return Ok(Vec::new());
-		}
-		let mut res = Vec::new();
-		for file_option in fs::read_dir(path)? {
-			let file = file_option.unwrap();
-			let owned_file_name = file.file_name();
-			let filename = owned_file_name.to_str()
-				.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData,
-					"File name is not a valid utf8 string"))?;
-			if !filename.is_ascii() || filename.len() < 65 {
-				return Err(std::io::Error::new(
-					std::io::ErrorKind::InvalidData,
-					"Invalid ChannelMonitor file name",
-				));
-			}
-			if filename.ends_with(".tmp") {
-				// If we were in the middle of committing an new update and crashed, it should be
-				// safe to ignore the update - we should never have returned to the caller and
-				// irrevocably committed to the new state in any way.
-				continue;
-			}
-
-			let txid: Txid = Txid::from_hex(filename.split_at(64).0)
-				.map_err(|_| std::io::Error::new(
-					std::io::ErrorKind::InvalidData,
-					"Invalid tx ID in filename",
-				))?;
-
-			let index: u16 = filename.split_at(65).1.parse()
-				.map_err(|_| std::io::Error::new(
-					std::io::ErrorKind::InvalidData,
-					"Invalid tx index in filename",
-				))?;
-
-			let contents = fs::read(&file.path())?;
-			let mut buffer = Cursor::new(&contents);
-			match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(&mut buffer, (&*entropy_source, &*signer_provider)) {
-				Ok((blockhash, channel_monitor)) => {
-					if channel_monitor.get_funding_txo().0.txid != txid || channel_monitor.get_funding_txo().0.index != index {
-						return Err(std::io::Error::new(std::io::ErrorKind::InvalidData,
-									       "ChannelMonitor was stored in the wrong file"));
-					}
-					res.push((blockhash, channel_monitor));
-				}
-				Err(e) => return Err(std::io::Error::new(
-					std::io::ErrorKind::InvalidData,
-					format!("Failed to deserialize ChannelMonitor: {}", e),
-				))
-			}
-		}
-		Ok(res)
-	}
-}
-
-impl KVStorePersister for FilesystemPersister {
-	fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
-		let mut dest_file = PathBuf::from(self.path_to_channel_data.clone());
-		dest_file.push(key);
-		util::write_to_file(dest_file, object)
-	}
-}
-
-#[cfg(test)]
-mod tests {
-	extern crate lightning;
-	extern crate bitcoin;
-	use crate::FilesystemPersister;
-	use bitcoin::hashes::hex::FromHex;
-	use bitcoin::Txid;
-	use lightning::chain::ChannelMonitorUpdateStatus;
-	use lightning::chain::chainmonitor::Persist;
-	use lightning::chain::channelmonitor::CLOSED_CHANNEL_UPDATE_ID;
-	use lightning::chain::transaction::OutPoint;
-	use lightning::{check_closed_broadcast, check_closed_event, check_added_monitors};
-	use lightning::events::{ClosureReason, MessageSendEventsProvider};
-	use lightning::ln::functional_test_utils::*;
-	use lightning::util::test_utils;
-	use std::fs;
-	#[cfg(target_os = "windows")]
-	use {
-		lightning::get_event_msg,
-		lightning::ln::msgs::ChannelMessageHandler,
-	};
-
-	impl Drop for FilesystemPersister {
-		fn drop(&mut self) {
-			// We test for invalid directory names, so it's OK if directory removal
-			// fails.
-			match fs::remove_dir_all(&self.path_to_channel_data) {
-				Err(e) => println!("Failed to remove test persister directory: {}", e),
-				_ => {}
-			}
-		}
-	}
-
-	#[test]
-	fn test_if_monitors_is_not_dir() {
-		let persister = FilesystemPersister::new("test_monitors_is_not_dir".to_string());
-
-		fs::create_dir_all(&persister.path_to_channel_data).unwrap();
-		let mut path = std::path::PathBuf::from(&persister.path_to_channel_data);
-		path.push("monitors");
-		fs::File::create(path).unwrap();
-
-		let chanmon_cfgs = create_chanmon_cfgs(1);
-		let mut node_cfgs = create_node_cfgs(1, &chanmon_cfgs);
-		let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &persister, node_cfgs[0].keys_manager);
-		node_cfgs[0].chain_monitor = chain_mon_0;
-		let node_chanmgrs = create_node_chanmgrs(1, &node_cfgs, &[None]);
-		let nodes = create_network(1, &node_cfgs, &node_chanmgrs);
-
-		// Check that read_channelmonitors() returns error if monitors/ is not a
-		// directory.
-		assert!(persister.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).is_err());
-	}
-
-	// Integration-test the FilesystemPersister. Test relaying a few payments
-	// and check that the persisted data is updated the appropriate number of
-	// times.
-	#[test]
-	fn test_filesystem_persister() {
-		// Create the nodes, giving them FilesystemPersisters for data persisters.
-		let persister_0 = FilesystemPersister::new("test_filesystem_persister_0".to_string());
-		let persister_1 = FilesystemPersister::new("test_filesystem_persister_1".to_string());
-		let chanmon_cfgs = create_chanmon_cfgs(2);
-		let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
-		let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &persister_0, node_cfgs[0].keys_manager);
-		let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &persister_1, node_cfgs[1].keys_manager);
-		node_cfgs[0].chain_monitor = chain_mon_0;
-		node_cfgs[1].chain_monitor = chain_mon_1;
-		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
-		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-
-		// Check that the persisted channel data is empty before any channels are
-		// open.
-		let mut persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
-		assert_eq!(persisted_chan_data_0.len(), 0);
-		let mut persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
-		assert_eq!(persisted_chan_data_1.len(), 0);
-
-		// Helper to make sure the channel is on the expected update ID.
-		macro_rules! check_persisted_data {
-			($expected_update_id: expr) => {
-				persisted_chan_data_0 = persister_0.read_channelmonitors(nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
-				assert_eq!(persisted_chan_data_0.len(), 1);
-				for (_, mon) in persisted_chan_data_0.iter() {
-					assert_eq!(mon.get_latest_update_id(), $expected_update_id);
-				}
-				persisted_chan_data_1 = persister_1.read_channelmonitors(nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
-				assert_eq!(persisted_chan_data_1.len(), 1);
-				for (_, mon) in persisted_chan_data_1.iter() {
-					assert_eq!(mon.get_latest_update_id(), $expected_update_id);
-				}
-			}
-		}
-
-		// Create some initial channel and check that a channel was persisted.
-		let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
-		check_persisted_data!(0);
-
-		// Send a few payments and make sure the monitors are updated to the latest.
-		send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000);
-		check_persisted_data!(5);
-		send_payment(&nodes[1], &vec!(&nodes[0])[..], 4000000);
-		check_persisted_data!(10);
-
-		// Force close because cooperative close doesn't result in any persisted
-		// updates.
-		nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
-		check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
-		check_closed_broadcast!(nodes[0], true);
-		check_added_monitors!(nodes[0], 1);
-
-		let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
-		assert_eq!(node_txn.len(), 1);
-
-		connect_block(&nodes[1], &create_dummy_block(nodes[0].best_block_hash(), 42, vec![node_txn[0].clone(), node_txn[0].clone()]));
-		check_closed_broadcast!(nodes[1], true);
-		check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed, [nodes[0].node.get_our_node_id()], 100000);
-		check_added_monitors!(nodes[1], 1);
-
-		// Make sure everything is persisted as expected after close.
-		check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);
-	}
-
-	// Test that if the persister's path to channel data is read-only, writing a
-	// monitor to it results in the persister returning a PermanentFailure.
-	// Windows ignores the read-only flag for folders, so this test is Unix-only.
-	#[cfg(not(target_os = "windows"))]
-	#[test]
-	fn test_readonly_dir_perm_failure() {
-		let persister = FilesystemPersister::new("test_readonly_dir_perm_failure".to_string());
-		fs::create_dir_all(&persister.path_to_channel_data).unwrap();
-
-		// Set up a dummy channel and force close. This will produce a monitor
-		// that we can then use to test persistence.
-		let chanmon_cfgs = create_chanmon_cfgs(2);
-		let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
-		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
-		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-		let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
-		nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
-		check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
-		let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
-		let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
-		let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
-
-		// Set the persister's directory to read-only, which should result in
-		// returning a permanent failure when we then attempt to persist a
-		// channel update.
-		let path = &persister.path_to_channel_data;
-		let mut perms = fs::metadata(path).unwrap().permissions();
-		perms.set_readonly(true);
-		fs::set_permissions(path, perms).unwrap();
-
-		let test_txo = OutPoint {
-			txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
-			index: 0
-		};
-		match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
-			ChannelMonitorUpdateStatus::PermanentFailure => {},
-			_ => panic!("unexpected result from persisting new channel")
-		}
-
-		nodes[1].node.get_and_clear_pending_msg_events();
-		added_monitors.clear();
-	}
-
-	// Test that if a persister's directory name is invalid, monitor persistence
-	// will fail.
-	#[cfg(target_os = "windows")]
-	#[test]
-	fn test_fail_on_open() {
-		// Set up a dummy channel and force close. This will produce a monitor
-		// that we can then use to test persistence.
-		let chanmon_cfgs = create_chanmon_cfgs(2);
-		let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
-		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
-		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-		let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
-		nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
-		check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
-		let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
-		let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
-		let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
-
-		// Create the persister with an invalid directory name and test that the
-		// channel fails to open because the directories fail to be created. There
-		// don't seem to be invalid filename characters on Unix that Rust doesn't
-		// handle, hence why the test is Windows-only.
-		let persister = FilesystemPersister::new(":<>/".to_string());
-
-		let test_txo = OutPoint {
-			txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
-			index: 0
-		};
-		match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
-			ChannelMonitorUpdateStatus::PermanentFailure => {},
-			_ => panic!("unexpected result from persisting new channel")
-		}
-
-		nodes[1].node.get_and_clear_pending_msg_events();
-		added_monitors.clear();
-	}
-}
-
-#[cfg(ldk_bench)]
-/// Benches
-pub mod bench {
-	use criterion::Criterion;
-
-	/// Bench!
-	pub fn bench_sends(bench: &mut Criterion) {
-		let persister_a = super::FilesystemPersister::new("bench_filesystem_persister_a".to_string());
-		let persister_b = super::FilesystemPersister::new("bench_filesystem_persister_b".to_string());
-		lightning::ln::channelmanager::bench::bench_two_sends(
-			bench, "bench_filesystem_persisted_sends", persister_a, persister_b);
-	}
-}
diff --git a/lightning-persister/src/util.rs b/lightning-persister/src/util.rs
deleted file mode 100644
index 20c4a815185..00000000000
--- a/lightning-persister/src/util.rs
+++ /dev/null
@@ -1,188 +0,0 @@
-#[cfg(target_os = "windows")]
-extern crate winapi;
-
-use std::fs;
-use std::path::PathBuf;
-use std::io::BufWriter;
-
-#[cfg(not(target_os = "windows"))]
-use std::os::unix::io::AsRawFd;
-
-use lightning::util::ser::Writeable;
-
-#[cfg(target_os = "windows")]
-use {
-	std::ffi::OsStr,
-	std::os::windows::ffi::OsStrExt
-};
-
-#[cfg(target_os = "windows")]
-macro_rules! call {
-	($e: expr) => (
-		if $e != 0 {
-			return Ok(())
-		} else {
-			return Err(std::io::Error::last_os_error())
-		}
-	)
-}
-
-#[cfg(target_os = "windows")]
-fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::WCHAR> {
-	path.as_ref().encode_wide().chain(Some(0)).collect()
-}
-
-#[allow(bare_trait_objects)]
-pub(crate) fn write_to_file<W: Writeable>(dest_file: PathBuf, data: &W) -> std::io::Result<()> {
-	let mut tmp_file = dest_file.clone();
-	tmp_file.set_extension("tmp");
-
-	let parent_directory = dest_file.parent().unwrap();
-	fs::create_dir_all(parent_directory)?;
-	// Do a crazy dance with lots of fsync()s to be overly cautious here...
-	// We never want to end up in a state where we've lost the old data, or end up using the
-	// old data on power loss after we've returned.
-	// The way to atomically write a file on Unix platforms is:
-	// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
-	{
-		// Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use
-		// rust stdlib 1.36 or higher.
-		let mut buf = BufWriter::new(fs::File::create(&tmp_file)?);
-		data.write(&mut buf)?;
-		buf.into_inner()?.sync_all()?;
-	}
-	// Fsync the parent directory on Unix.
-	#[cfg(not(target_os = "windows"))]
-	{
-		fs::rename(&tmp_file, &dest_file)?;
-		let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
-		unsafe { libc::fsync(dir_file.as_raw_fd()); }
-	}
-	#[cfg(target_os = "windows")]
-	{
-		if dest_file.exists() {
-			unsafe {winapi::um::winbase::ReplaceFileW(
-				path_to_windows_str(dest_file).as_ptr(), path_to_windows_str(tmp_file).as_ptr(), std::ptr::null(),
-				winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS,
-				std::ptr::null_mut() as *mut winapi::ctypes::c_void,
-				std::ptr::null_mut() as *mut winapi::ctypes::c_void
-			)};
-		} else {
-			call!(unsafe {winapi::um::winbase::MoveFileExW(
-				path_to_windows_str(tmp_file).as_ptr(), path_to_windows_str(dest_file).as_ptr(),
-				winapi::um::winbase::MOVEFILE_WRITE_THROUGH | winapi::um::winbase::MOVEFILE_REPLACE_EXISTING
-			)});
-		}
-	}
-	Ok(())
-}
-
-#[cfg(test)]
-mod tests {
-	use lightning::util::ser::{Writer, Writeable};
-
-	use super::{write_to_file};
-	use std::fs;
-	use std::io;
-	use std::path::PathBuf;
-
-	struct TestWriteable{}
-	impl Writeable for TestWriteable {
-		fn write<W: Writer>(&self, writer: &mut W) -> Result<(), std::io::Error> {
-			writer.write_all(&[42; 1])
-		}
-	}
-
-	// Test that if the persister's path to channel data is read-only, writing
-	// data to it fails. Windows ignores the read-only flag for folders, so this
-	// test is Unix-only.
-	#[cfg(not(target_os = "windows"))]
-	#[test]
-	fn test_readonly_dir() {
-		let test_writeable = TestWriteable{};
-		let filename = "test_readonly_dir_persister_filename".to_string();
-		let path = "test_readonly_dir_persister_dir";
-		fs::create_dir_all(path).unwrap();
-		let mut perms = fs::metadata(path).unwrap().permissions();
-		perms.set_readonly(true);
-		fs::set_permissions(path, perms).unwrap();
-		let mut dest_file = PathBuf::from(path);
-		dest_file.push(filename);
-		match write_to_file(dest_file, &test_writeable) {
-			Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied),
-			_ => panic!("Unexpected error message")
-		}
-	}
-
-	// Test failure to rename in the process of atomically creating a channel
-	// monitor's file. We induce this failure by making the `tmp` file a
-	// directory.
-	// Explanation: given "from" = the file being renamed, "to" = the destination
-	// file that already exists: Unix should fail because if "from" is a file,
-	// then "to" is also required to be a file.
-	// TODO: ideally try to make this work on Windows again
-	#[cfg(not(target_os = "windows"))]
-	#[test]
-	fn test_rename_failure() {
-		let test_writeable = TestWriteable{};
-		let filename = "test_rename_failure_filename";
-		let path = "test_rename_failure_dir";
-		let mut dest_file = PathBuf::from(path);
-		dest_file.push(filename);
-		// Create the channel data file and make it a directory.
-		fs::create_dir_all(dest_file.clone()).unwrap();
-		match write_to_file(dest_file, &test_writeable) {
-			Err(e) => assert_eq!(e.raw_os_error(), Some(libc::EISDIR)),
-			_ => panic!("Unexpected Ok(())")
-		}
-		fs::remove_dir_all(path).unwrap();
-	}
-
-	#[test]
-	fn test_diskwriteable_failure() {
-		struct FailingWriteable {}
-		impl Writeable for FailingWriteable {
-			fn write<W: Writer>(&self, _writer: &mut W) -> Result<(), std::io::Error> {
-				Err(std::io::Error::new(std::io::ErrorKind::Other, "expected failure"))
-			}
-		}
-
-		let filename = "test_diskwriteable_failure";
-		let path = "test_diskwriteable_failure_dir";
-		let test_writeable = FailingWriteable{};
-		let mut dest_file = PathBuf::from(path);
-		dest_file.push(filename);
-		match write_to_file(dest_file, &test_writeable) {
-			Err(e) => {
-				assert_eq!(e.kind(), std::io::ErrorKind::Other);
-				assert_eq!(e.get_ref().unwrap().to_string(), "expected failure");
-			},
-			_ => panic!("unexpected result")
-		}
-		fs::remove_dir_all(path).unwrap();
-	}
-
-	// Test failure to create the temporary file in the persistence process.
-	// We induce this failure by having the temp file already exist and be a
-	// directory.
-	#[test]
-	fn test_tmp_file_creation_failure() {
-		let test_writeable = TestWriteable{};
-		let filename = "test_tmp_file_creation_failure_filename".to_string();
-		let path = "test_tmp_file_creation_failure_dir";
-		let mut dest_file = PathBuf::from(path);
-		dest_file.push(filename);
-		let mut tmp_file = dest_file.clone();
-		tmp_file.set_extension("tmp");
-		fs::create_dir_all(tmp_file).unwrap();
-		match write_to_file(dest_file, &test_writeable) {
-			Err(e) => {
-				#[cfg(not(target_os = "windows"))]
-				assert_eq!(e.raw_os_error(), Some(libc::EISDIR));
-				#[cfg(target_os = "windows")]
-				assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
-			}
-			_ => panic!("Unexpected error message")
-		}
-	}
-}
diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs
index 5b122b7a570..ca0605c9598 100644
--- a/lightning/src/util/persist.rs
+++ b/lightning/src/util/persist.rs
@@ -114,15 +114,6 @@ pub trait KVStore {
 	fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>>;
 }
 
-/// Trait for a key-value store for persisting some writeable object at some key
-/// Implementing `KVStorePersister` provides auto-implementations for [`Persister`]
-/// and [`Persist`] traits.  It uses "manager", "network_graph",
-/// and "monitors/{funding_txo_id}_{funding_txo_index}" for keys.
-pub trait KVStorePersister {
-	/// Persist the given writeable using the provided key
-	fn persist<W: Writeable>(&self, key: &str, object: &W) -> io::Result<()>;
-}
-
 /// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
 pub trait Persister<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>>
 	where M::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
@@ -144,7 +135,8 @@ pub trait Persister<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F:
 	fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
 }
 
-impl<'a, A: KVStorePersister, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for A
+
+impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for A
 	where M::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
 		T::Target: 'static + BroadcasterInterface,
 		ES::Target: 'static + EntropySource,
@@ -154,39 +146,56 @@ impl<'a, A: KVStorePersister, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Dere
 		R::Target: 'static + Router,
 		L::Target: 'static + Logger,
 {
-	/// Persist the given ['ChannelManager'] to disk with the name "manager", returning an error if persistence failed.
+	/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed.
 	fn persist_manager(&self, channel_manager: &ChannelManager<M, T, ES, NS, SP, F, R, L>) -> Result<(), io::Error> {
-		self.persist("manager", channel_manager)
+		self.write(CHANNEL_MANAGER_PERSISTENCE_NAMESPACE,
+				   CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE,
+				   CHANNEL_MANAGER_PERSISTENCE_KEY,
+				   &channel_manager.encode())
 	}
 
-	/// Persist the given [`NetworkGraph`] to disk with the name "network_graph", returning an error if persistence failed.
+	/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
 	fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
-		self.persist("network_graph", network_graph)
+		self.write(NETWORK_GRAPH_PERSISTENCE_NAMESPACE,
+				   NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE,
+				   NETWORK_GRAPH_PERSISTENCE_KEY,
+				   &network_graph.encode())
 	}
 
-	/// Persist the given [`WriteableScore`] to disk with name "scorer", returning an error if persistence failed.
+	/// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
 	fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
-		self.persist("scorer", &scorer)
+		self.write(SCORER_PERSISTENCE_NAMESPACE,
+				   SCORER_PERSISTENCE_SUB_NAMESPACE,
+				   SCORER_PERSISTENCE_KEY,
+				   &scorer.encode())
 	}
 }
 
-impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStorePersister> Persist<ChannelSigner> for K {
+impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
 	// TODO: We really need a way for the persister to inform the user that its time to crash/shut
 	// down once these start returning failure.
 	// A PermanentFailure implies we should probably just shut down the node since we're
 	// force-closing channels without even broadcasting!
 
 	fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
-		let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
-		match self.persist(&key, monitor) {
+		let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
+		match self.write(
+			CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+			CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+			&key, &monitor.encode())
+		{
 			Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
 			Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
 		}
 	}
 
 	fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
-		let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
-		match self.persist(&key, monitor) {
+		let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
+		match self.write(
+			CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+			CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+			&key, &monitor.encode())
+		{
 			Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
 			Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
 		}

From 413f9a7de677631834a1430fdbd2a4e1eaac5d1d Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Fri, 4 Aug 2023 16:27:39 +0200
Subject: [PATCH 7/9] Migrate `FilesystemPersister` tests to `FilesystemStore`

---
 lightning-persister/Cargo.toml        |   1 +
 lightning-persister/src/fs_store.rs   | 140 +++++++++++++++++++++++++-
 lightning-persister/src/test_utils.rs |  73 +++++++++++++-
 3 files changed, 212 insertions(+), 2 deletions(-)

diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml
index 7ba4223227d..361ab0c57a1 100644
--- a/lightning-persister/Cargo.toml
+++ b/lightning-persister/Cargo.toml
@@ -25,3 +25,4 @@ criterion = { version = "0.4", optional = true, default-features = false }
 
 [dev-dependencies]
 lightning = { version = "0.0.116", path = "../lightning", features = ["_test_utils"] }
+bitcoin = { version = "0.29.0", default-features = false }
diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs
index f74806d7d8e..638e74e6506 100644
--- a/lightning-persister/src/fs_store.rs
+++ b/lightning-persister/src/fs_store.rs
@@ -367,7 +367,36 @@ impl KVStore for FilesystemStore {
 #[cfg(test)]
 mod tests {
 	use super::*;
-	use crate::test_utils::do_read_write_remove_list_persist;
+	use crate::test_utils::{do_read_write_remove_list_persist, do_test_store};
+
+	use bitcoin::hashes::hex::FromHex;
+	use bitcoin::Txid;
+
+	use lightning::chain::ChannelMonitorUpdateStatus;
+	use lightning::chain::chainmonitor::Persist;
+	use lightning::chain::transaction::OutPoint;
+	use lightning::check_closed_event;
+	use lightning::events::{ClosureReason, MessageSendEventsProvider};
+	use lightning::ln::functional_test_utils::*;
+	use lightning::util::test_utils;
+	use lightning::util::persist::read_channel_monitors;
+	use std::fs;
+	#[cfg(target_os = "windows")]
+	use {
+		lightning::get_event_msg,
+		lightning::ln::msgs::ChannelMessageHandler,
+	};
+
+	impl Drop for FilesystemStore {
+		fn drop(&mut self) {
+			// We test for invalid directory names, so it's OK if directory removal
+			// fails.
+			match fs::remove_dir_all(&self.data_dir) {
+				Err(e) => println!("Failed to remove test persister directory: {}", e),
+				_ => {}
+			}
+		}
+	}
 
 	#[test]
 	fn read_write_remove_list_persist() {
@@ -376,4 +405,113 @@ mod tests {
 		let fs_store = FilesystemStore::new(temp_path);
 		do_read_write_remove_list_persist(&fs_store);
 	}
+
+	#[test]
+	fn test_if_monitors_is_not_dir() {
+		let store = FilesystemStore::new("test_monitors_is_not_dir".into());
+
+		fs::create_dir_all(&store.get_data_dir()).unwrap();
+		let mut path = std::path::PathBuf::from(&store.get_data_dir());
+		path.push("monitors");
+		fs::File::create(path).unwrap();
+
+		let chanmon_cfgs = create_chanmon_cfgs(1);
+		let mut node_cfgs = create_node_cfgs(1, &chanmon_cfgs);
+		let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &store, node_cfgs[0].keys_manager);
+		node_cfgs[0].chain_monitor = chain_mon_0;
+		let node_chanmgrs = create_node_chanmgrs(1, &node_cfgs, &[None]);
+		let nodes = create_network(1, &node_cfgs, &node_chanmgrs);
+
+		// Check that read_channel_monitors() returns error if monitors/ is not a
+		// directory.
+		assert!(read_channel_monitors(&store, nodes[0].keys_manager, nodes[0].keys_manager).is_err());
+	}
+
+	#[test]
+	fn test_filesystem_store() {
+		// Create the nodes, giving them FilesystemStores for data stores.
+		let store_0 = FilesystemStore::new("test_filesystem_store_0".into());
+		let store_1 = FilesystemStore::new("test_filesystem_store_1".into());
+		do_test_store(&store_0, &store_1)
+	}
+
+	// Test that if the store's path to channel data is read-only, writing a
+	// monitor to it results in the store returning a PermanentFailure.
+	// Windows ignores the read-only flag for folders, so this test is Unix-only.
+	#[cfg(not(target_os = "windows"))]
+	#[test]
+	fn test_readonly_dir_perm_failure() {
+		let store = FilesystemStore::new("test_readonly_dir_perm_failure".into());
+		fs::create_dir_all(&store.get_data_dir()).unwrap();
+
+		// Set up a dummy channel and force close. This will produce a monitor
+		// that we can then use to test persistence.
+		let chanmon_cfgs = create_chanmon_cfgs(2);
+		let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+		let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+		nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
+		check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
+		let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
+		let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
+		let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
+
+		// Set the store's directory to read-only, which should result in
+		// returning a permanent failure when we then attempt to persist a
+		// channel update.
+		let path = &store.get_data_dir();
+		let mut perms = fs::metadata(path).unwrap().permissions();
+		perms.set_readonly(true);
+		fs::set_permissions(path, perms).unwrap();
+
+		let test_txo = OutPoint {
+			txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
+			index: 0
+		};
+		match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
+			ChannelMonitorUpdateStatus::PermanentFailure => {},
+			_ => panic!("unexpected result from persisting new channel")
+		}
+
+		nodes[1].node.get_and_clear_pending_msg_events();
+		added_monitors.clear();
+	}
+
+	// Test that if a store's directory name is invalid, monitor persistence
+	// will fail.
+	#[cfg(target_os = "windows")]
+	#[test]
+	fn test_fail_on_open() {
+		// Set up a dummy channel and force close. This will produce a monitor
+		// that we can then use to test persistence.
+		let chanmon_cfgs = create_chanmon_cfgs(2);
+		let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+		let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+		let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+		let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+		nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
+		check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
+		let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
+		let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
+		let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
+
+		// Create the store with an invalid directory name and test that the
+		// channel fails to open because the directories fail to be created. There
+		// don't seem to be invalid filename characters on Unix that Rust doesn't
+		// handle, hence why the test is Windows-only.
+		let store = FilesystemStore::new(":<>/".into());
+
+		let test_txo = OutPoint {
+			txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
+			index: 0
+		};
+		match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
+			ChannelMonitorUpdateStatus::PermanentFailure => {},
+			_ => panic!("unexpected result from persisting new channel")
+		}
+
+		nodes[1].node.get_and_clear_pending_msg_events();
+		added_monitors.clear();
+	}
 }
diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs
index 357a8b2ee8c..91557500f3d 100644
--- a/lightning-persister/src/test_utils.rs
+++ b/lightning-persister/src/test_utils.rs
@@ -1,4 +1,11 @@
-use lightning::util::persist::{KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN};
+use lightning::util::persist::{KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN, read_channel_monitors};
+use lightning::ln::functional_test_utils::{connect_block, create_announced_chan_between_nodes,
+	create_chanmon_cfgs, create_dummy_block, create_network, create_node_cfgs, create_node_chanmgrs,
+	send_payment};
+use lightning::chain::channelmonitor::CLOSED_CHANNEL_UPDATE_ID;
+use lightning::util::test_utils;
+use lightning::{check_closed_broadcast, check_closed_event, check_added_monitors};
+use lightning::events::ClosureReason;
 
 use std::panic::RefUnwindSafe;
 
@@ -48,3 +55,67 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStore + RefUnwindSafe>(kv_s
 	let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
 	assert_eq!(listed_keys.len(), 0);
 }
+
+// Integration-test the given KVStore implementation. Test relaying a few payments and check that
+// the persisted data is updated the appropriate number of times.
+pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
+	let chanmon_cfgs = create_chanmon_cfgs(2);
+	let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+	let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, store_0, node_cfgs[0].keys_manager);
+	let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, store_1, node_cfgs[1].keys_manager);
+	node_cfgs[0].chain_monitor = chain_mon_0;
+	node_cfgs[1].chain_monitor = chain_mon_1;
+	let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+	let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+	// Check that the persisted channel data is empty before any channels are
+	// open.
+	let mut persisted_chan_data_0 = read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
+	assert_eq!(persisted_chan_data_0.len(), 0);
+	let mut persisted_chan_data_1 = read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
+	assert_eq!(persisted_chan_data_1.len(), 0);
+
+	// Helper to make sure the channel is on the expected update ID.
+	macro_rules! check_persisted_data {
+		($expected_update_id: expr) => {
+			persisted_chan_data_0 = read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
+			assert_eq!(persisted_chan_data_0.len(), 1);
+			for (_, mon) in persisted_chan_data_0.iter() {
+				assert_eq!(mon.get_latest_update_id(), $expected_update_id);
+			}
+			persisted_chan_data_1 = read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
+			assert_eq!(persisted_chan_data_1.len(), 1);
+			for (_, mon) in persisted_chan_data_1.iter() {
+				assert_eq!(mon.get_latest_update_id(), $expected_update_id);
+			}
+		}
+	}
+
+	// Create some initial channel and check that a channel was persisted.
+	let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
+	check_persisted_data!(0);
+
+	// Send a few payments and make sure the monitors are updated to the latest.
+	send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000);
+	check_persisted_data!(5);
+	send_payment(&nodes[1], &vec!(&nodes[0])[..], 4000000);
+	check_persisted_data!(10);
+
+	// Force close because cooperative close doesn't result in any persisted
+	// updates.
+	nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
+	check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
+	check_closed_broadcast!(nodes[0], true);
+	check_added_monitors!(nodes[0], 1);
+
+	let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+	assert_eq!(node_txn.len(), 1);
+
+	connect_block(&nodes[1], &create_dummy_block(nodes[0].best_block_hash(), 42, vec![node_txn[0].clone(), node_txn[0].clone()]));
+	check_closed_broadcast!(nodes[1], true);
+	check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed, [nodes[0].node.get_our_node_id()], 100000);
+	check_added_monitors!(nodes[1], 1);
+
+	// Make sure everything is persisted as expected after close.
+	check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);
+}

From 7576c89adb7bf25c876fb4ec070dd71877a41896 Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Fri, 4 Aug 2023 11:34:45 +0200
Subject: [PATCH 8/9] Add benchmarking for `FilesystemStore`

We re-add benchmarking for `FilesystemStore` now that we switched over
to it.
---
 bench/benches/bench.rs              |  1 +
 lightning-persister/src/fs_store.rs | 14 ++++++++++++++
 2 files changed, 15 insertions(+)

diff --git a/bench/benches/bench.rs b/bench/benches/bench.rs
index 3fc3abe687b..bc4bd010822 100644
--- a/bench/benches/bench.rs
+++ b/bench/benches/bench.rs
@@ -15,6 +15,7 @@ criterion_group!(benches,
 	lightning::routing::router::benches::generate_large_mpp_routes_with_probabilistic_scorer,
 	lightning::sign::benches::bench_get_secure_random_bytes,
 	lightning::ln::channelmanager::bench::bench_sends,
+	lightning_persister::fs_store::bench::bench_sends,
 	lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file,
 	lightning::routing::gossip::benches::read_network_graph,
 	lightning::routing::gossip::benches::write_network_graph);
diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs
index 638e74e6506..56d071da9f0 100644
--- a/lightning-persister/src/fs_store.rs
+++ b/lightning-persister/src/fs_store.rs
@@ -515,3 +515,17 @@ mod tests {
 		added_monitors.clear();
 	}
 }
+
+#[cfg(ldk_bench)]
+/// Benches
+pub mod bench {
+	use criterion::Criterion;
+
+	/// Bench!
+	pub fn bench_sends(bench: &mut Criterion) {
+		let store_a = super::FilesystemStore::new("bench_filesystem_store_a".into());
+		let store_b = super::FilesystemStore::new("bench_filesystem_store_b".into());
+		lightning::ln::channelmanager::bench::bench_two_sends(
+			bench, "bench_filesystem_persisted_sends", store_a, store_b);
+	}
+}

From 7a656719af1cf1d124821eb385a7d88f792660b4 Mon Sep 17 00:00:00 2001
From: Elias Rohrer <dev@tnull.de>
Date: Mon, 21 Aug 2023 16:17:35 +0200
Subject: [PATCH 9/9] Add `TestStore` implementation of `KVStore`

---
 lightning/src/util/test_utils.rs | 92 ++++++++++++++++++++++++++++++++
 1 file changed, 92 insertions(+)

diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs
index 7a9ce06910b..8e2be87d8be 100644
--- a/lightning/src/util/test_utils.rs
+++ b/lightning/src/util/test_utils.rs
@@ -38,6 +38,7 @@ use crate::util::config::UserConfig;
 use crate::util::test_channel_signer::{TestChannelSigner, EnforcementState};
 use crate::util::logger::{Logger, Level, Record};
 use crate::util::ser::{Readable, ReadableArgs, Writer, Writeable};
+use crate::util::persist::KVStore;
 
 use bitcoin::EcdsaSighashType;
 use bitcoin::blockdata::constants::ChainHash;
@@ -425,6 +426,97 @@ impl<Signer: sign::WriteableEcdsaChannelSigner> chainmonitor::Persist<Signer> fo
 	}
 }
 
+pub(crate) struct TestStore {
+	persisted_bytes: Mutex<HashMap<String, HashMap<String, Vec<u8>>>>,
+	read_only: bool,
+}
+
+impl TestStore {
+	pub fn new(read_only: bool) -> Self {
+		let persisted_bytes = Mutex::new(HashMap::new());
+		Self { persisted_bytes, read_only }
+	}
+}
+
+impl KVStore for TestStore {
+	fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result<Vec<u8>> {
+		let persisted_lock = self.persisted_bytes.lock().unwrap();
+		let prefixed = if sub_namespace.is_empty() {
+			namespace.to_string()
+		} else {
+			format!("{}/{}", namespace, sub_namespace)
+		};
+
+		if let Some(outer_ref) = persisted_lock.get(&prefixed) {
+			if let Some(inner_ref) = outer_ref.get(key) {
+				let bytes = inner_ref.clone();
+				Ok(bytes)
+			} else {
+				Err(io::Error::new(io::ErrorKind::NotFound, "Key not found"))
+			}
+		} else {
+			Err(io::Error::new(io::ErrorKind::NotFound, "Namespace not found"))
+		}
+	}
+
+	fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> {
+		if self.read_only {
+			return Err(io::Error::new(
+				io::ErrorKind::PermissionDenied,
+				"Cannot modify read-only store",
+			));
+		}
+		let mut persisted_lock = self.persisted_bytes.lock().unwrap();
+
+		let prefixed = if sub_namespace.is_empty() {
+			namespace.to_string()
+		} else {
+			format!("{}/{}", namespace, sub_namespace)
+		};
+		let outer_e = persisted_lock.entry(prefixed).or_insert(HashMap::new());
+		let mut bytes = Vec::new();
+		bytes.write_all(buf)?;
+		outer_e.insert(key.to_string(), bytes);
+		Ok(())
+	}
+
+	fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, _lazy: bool) -> io::Result<()> {
+		if self.read_only {
+			return Err(io::Error::new(
+				io::ErrorKind::PermissionDenied,
+				"Cannot modify read-only store",
+			));
+		}
+
+		let mut persisted_lock = self.persisted_bytes.lock().unwrap();
+
+		let prefixed = if sub_namespace.is_empty() {
+			namespace.to_string()
+		} else {
+			format!("{}/{}", namespace, sub_namespace)
+		};
+		if let Some(outer_ref) = persisted_lock.get_mut(&prefixed) {
+				outer_ref.remove(&key.to_string());
+		}
+
+		Ok(())
+	}
+
+	fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>> {
+		let mut persisted_lock = self.persisted_bytes.lock().unwrap();
+
+		let prefixed = if sub_namespace.is_empty() {
+			namespace.to_string()
+		} else {
+			format!("{}/{}", namespace, sub_namespace)
+		};
+		match persisted_lock.entry(prefixed) {
+			hash_map::Entry::Occupied(e) => Ok(e.get().keys().cloned().collect()),
+			hash_map::Entry::Vacant(_) => Ok(Vec::new()),
+		}
+	}
+}
+
 pub struct TestBroadcaster {
 	pub txn_broadcasted: Mutex<Vec<Transaction>>,
 	pub blocks: Arc<Mutex<Vec<(Block, u32)>>>,