Skip to content

Commit 8658c7d

Browse files
committed
add persister that writes channel updates individually
1 parent aff67e5 commit 8658c7d

File tree

1 file changed

+326
-22
lines changed

1 file changed

+326
-22
lines changed

lightning/src/util/persist.rs

+326-22
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
//! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`],
99
//! and [`ChannelMonitor`] all in one place.
1010
11+
use core::convert::{TryFrom, TryInto};
1112
use core::ops::Deref;
1213
use bitcoin::hashes::hex::{FromHex, ToHex};
1314
use bitcoin::{BlockHash, Txid};
1415

1516
use crate::io;
17+
use crate::ln::msgs::DecodeError;
1618
use crate::prelude::{Vec, String};
1719
use crate::routing::scoring::WriteableScore;
1820

@@ -21,12 +23,12 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
2123
use crate::chain::chainmonitor::{Persist, MonitorUpdateId};
2224
use crate::sign::{EntropySource, NodeSigner, WriteableEcdsaChannelSigner, SignerProvider};
2325
use crate::chain::transaction::OutPoint;
24-
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
26+
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID};
2527
use crate::ln::channelmanager::ChannelManager;
2628
use crate::routing::router::Router;
2729
use crate::routing::gossip::NetworkGraph;
2830
use crate::util::logger::Logger;
29-
use crate::util::ser::{ReadableArgs, Writeable};
31+
use crate::util::ser::{Readable, ReadableArgs, Writeable};
3032

3133
/// The namespace under which the [`ChannelManager`] will be persisted.
3234
pub const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE: &str = "";
@@ -35,6 +37,8 @@ pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
3537

3638
/// The namespace under which [`ChannelMonitor`]s will be persisted.
3739
pub const CHANNEL_MONITOR_PERSISTENCE_NAMESPACE: &str = "monitors";
40+
/// The namespace under which [`ChannelMonitorUpdate`]s will be persisted.
41+
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE: &str = "monitors_updates";
3842

3943
/// The namespace under which the [`NetworkGraph`] will be persisted.
4044
pub const NETWORK_GRAPH_PERSISTENCE_NAMESPACE: &str = "";
@@ -126,28 +130,28 @@ impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Der
126130
}
127131
}
128132

129-
impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
130-
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
131-
// down once these start returning failure.
132-
// A PermanentFailure implies we should probably just shut down the node since we're
133-
// force-closing channels without even broadcasting!
133+
// impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
134+
// // TODO: We really need a way for the persister to inform the user that its time to crash/shut
135+
// // down once these start returning failure.
136+
// // A PermanentFailure implies we should probably just shut down the node since we're
137+
// // force-closing channels without even broadcasting!
134138

135-
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
136-
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
137-
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
138-
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
139-
Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
140-
}
141-
}
139+
// fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
140+
// let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
141+
// match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
142+
// Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
143+
// Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
144+
// }
145+
// }
142146

143-
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
144-
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
145-
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
146-
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
147-
Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
148-
}
149-
}
150-
}
147+
// fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
148+
// let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
149+
// match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
150+
// Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
151+
// Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
152+
// }
153+
// }
154+
// }
151155

152156
/// Read previously persisted [`ChannelMonitor`]s from the store.
153157
pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
@@ -194,3 +198,303 @@ where
194198
}
195199
Ok(res)
196200
}
201+
202+
enum KVStoreChannelMonitorReaderError {
203+
/// The monitor name was improperly formatted.
204+
BadMonitorName(String, String),
205+
/// The monitor could not be decoded.
206+
MonitorDecodeFailed(DecodeError, String),
207+
/// The update could not be decoded.
208+
UpdateDecodeFailed(DecodeError, String),
209+
/// Storage could not be read.
210+
StorageReadFailed(io::Error, String),
211+
/// An update could not be applied to a monitor.
212+
UpdateFailed(String, String),
213+
}
214+
215+
impl From<KVStoreChannelMonitorReaderError> for io::Error {
216+
fn from(value: KVStoreChannelMonitorReaderError) -> Self {
217+
match value {
218+
KVStoreChannelMonitorReaderError::BadMonitorName(reason, context) => {
219+
io::Error::new(io::ErrorKind::InvalidInput, format!("{reason}, context: {context}'"))
220+
},
221+
KVStoreChannelMonitorReaderError::MonitorDecodeFailed(reason, context) => {
222+
io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'"))
223+
},
224+
KVStoreChannelMonitorReaderError::UpdateDecodeFailed(reason, context) => {
225+
io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'"))
226+
},
227+
KVStoreChannelMonitorReaderError::StorageReadFailed(reason, context) => {
228+
io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'"))
229+
},
230+
KVStoreChannelMonitorReaderError::UpdateFailed(reason, context) => {
231+
io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context}'"))
232+
},
233+
}
234+
}
235+
}
236+
237+
/// A struct representing a name for a monitor.
238+
#[derive(Clone, Debug)]
239+
pub struct MonitorName(String);
240+
241+
impl TryFrom<MonitorName> for OutPoint {
242+
type Error = std::io::Error;
243+
244+
fn try_from(value: MonitorName) -> Result<Self, io::Error> {
245+
let (txid_hex, index) = value.0.split_once('_').ok_or_else(|| {
246+
KVStoreChannelMonitorReaderError::BadMonitorName("no underscore".to_string(), value.0.clone())
247+
})?;
248+
let index = index.parse().map_err(|e| {
249+
KVStoreChannelMonitorReaderError::BadMonitorName(
250+
format!("bad index value, caused by {e}"),
251+
value.0.clone(),
252+
)
253+
})?;
254+
let txid = Txid::from_hex(txid_hex).map_err(|e| {
255+
KVStoreChannelMonitorReaderError::BadMonitorName(
256+
format!("bad txid, caused by: {e}"),
257+
value.0.clone(),
258+
)
259+
})?;
260+
Ok(OutPoint { txid, index })
261+
}
262+
}
263+
264+
impl From<OutPoint> for MonitorName {
265+
fn from(value: OutPoint) -> Self {
266+
MonitorName(format!("{}_{}", value.txid.to_hex(), value.index))
267+
}
268+
}
269+
270+
/// A struct representing a name for an update.
271+
#[derive(Clone, Debug)]
272+
pub struct UpdateName(String);
273+
274+
impl From<u64> for UpdateName {
275+
fn from(value: u64) -> Self {
276+
Self(format!("{:0>20}", value))
277+
}
278+
}
279+
280+
#[allow(clippy::type_complexity)]
281+
pub trait KVStoreChannelMonitorReader<K: KVStore> {
282+
fn read_channelmonitors<ES: Deref + Clone, SP: Deref + Clone, B: Deref, F: Deref + Clone, L: Deref>(
283+
&self, entropy_source: ES, signer_provider: SP, broadcaster: &B, fee_estimator: F,
284+
logger: &L,
285+
) -> std::io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
286+
where
287+
ES::Target: EntropySource + Sized,
288+
SP::Target: SignerProvider + Sized,
289+
B::Target: BroadcasterInterface,
290+
F::Target: FeeEstimator,
291+
L::Target: Logger;
292+
/// List all the names of monitors.
293+
fn list_monitor_names(&self) -> io::Result<Vec<MonitorName>>;
294+
/// Key to a specific monitor.
295+
fn monitor_key(&self, monitor_name: &MonitorName) -> String;
296+
/// Deserialize a channel monitor.
297+
fn deserialize_monitor<ES: Deref, SP: Deref>(
298+
&self, entropy_source: ES, signer_provider: SP, monitor_name: MonitorName,
299+
) -> io::Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>
300+
where
301+
ES::Target: EntropySource + Sized,
302+
SP::Target: SignerProvider + Sized;
303+
/// List all the names of updates corresponding to a given monitor name.
304+
fn list_update_names(&self, monitor_name: &MonitorName) -> io::Result<Vec<UpdateName>>;
305+
/// Path to corresponding update directory for a given monitor name.
306+
fn path_to_monitor_updates(&self, monitor_name: &MonitorName) -> String;
307+
/// Deserialize a channel monitor update.
308+
fn deserialize_monitor_update(
309+
&self, monitor_name: &MonitorName, update_name: &UpdateName,
310+
) -> io::Result<ChannelMonitorUpdate>;
311+
/// Key to a specific update.
312+
fn update_key(&self, monitor_name: &MonitorName, update_name: &UpdateName) -> String;
313+
/// Delete updates with an update_id lower than the given channel monitor.
314+
fn delete_stale_updates<ChannelSigner: WriteableEcdsaChannelSigner>(
315+
&self, channel_id: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
316+
) -> io::Result<()>;
317+
}
318+
319+
impl<K: KVStore> KVStoreChannelMonitorReader<K> for K {
320+
fn read_channelmonitors<
321+
ES: Deref + Clone,
322+
SP: Deref + Clone,
323+
B: Deref,
324+
F: Deref + Clone,
325+
L: Deref,
326+
>(
327+
&self, entropy_source: ES, signer_provider: SP, broadcaster: &B, fee_estimator: F,
328+
logger: &L,
329+
) -> std::io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
330+
where
331+
ES::Target: EntropySource + Sized,
332+
SP::Target: SignerProvider + Sized,
333+
B::Target: BroadcasterInterface,
334+
F::Target: FeeEstimator,
335+
L::Target: Logger
336+
{
337+
let mut res = Vec::new();
338+
// for each monitor...
339+
for monitor_name in self.list_monitor_names()? {
340+
// ...parse the monitor
341+
let (bh, monitor) = self.deserialize_monitor(
342+
entropy_source.clone(),
343+
signer_provider.clone(),
344+
monitor_name.clone(),
345+
)?;
346+
// ...parse and apply the updates with an id higher than the monitor.
347+
for update_name in self.list_update_names(&monitor_name)? {
348+
let update = self.deserialize_monitor_update(&monitor_name, &update_name)?;
349+
if update.update_id == CLOSED_CHANNEL_UPDATE_ID
350+
|| update.update_id > monitor.get_latest_update_id()
351+
{
352+
monitor
353+
.update_monitor(&update, broadcaster, fee_estimator.clone(), logger)
354+
.map_err(|_| {
355+
KVStoreChannelMonitorReaderError::UpdateFailed(
356+
"update_monitor returned Err(())".to_string(),
357+
format!("monitor: {:?}", monitor_name),
358+
)
359+
})?;
360+
}
361+
}
362+
// ...push the result into the return vec
363+
res.push((bh, monitor))
364+
}
365+
Ok(res)
366+
}
367+
368+
/// Key to a specific monitor.
369+
fn monitor_key(&self, monitor_name: &MonitorName) -> String {
370+
CHANNEL_MONITOR_PERSISTENCE_NAMESPACE.to_owned() + &monitor_name.0
371+
}
372+
373+
/// Key to a specific update.
374+
fn update_key(&self, monitor_name: &MonitorName, update_name: &UpdateName) -> String {
375+
self.path_to_monitor_updates(monitor_name) + &update_name.0
376+
}
377+
378+
/// List all the names of monitors.
379+
fn list_monitor_names(&self) -> io::Result<Vec<MonitorName>> {
380+
Ok(self.list(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE)?.into_iter().map(MonitorName).collect())
381+
}
382+
383+
/// List all the names of updates corresponding to a given monitor name.
384+
fn list_update_names(&self, monitor_name: &MonitorName) -> io::Result<Vec<UpdateName>> {
385+
let update_dir_path = self.path_to_monitor_updates(monitor_name);
386+
Ok(self.list(&update_dir_path)?.into_iter().map(UpdateName).collect())
387+
}
388+
389+
/// Path to corresponding update directory for a given monitor name.
390+
fn path_to_monitor_updates(&self, monitor_name: &MonitorName) -> String {
391+
CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE.to_owned() + &monitor_name.0
392+
}
393+
394+
/// Deserialize a channel monitor.
395+
fn deserialize_monitor<ES: Deref, SP: Deref>(
396+
&self, entropy_source: ES, signer_provider: SP, monitor_name: MonitorName,
397+
) -> io::Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>
398+
where
399+
ES::Target: EntropySource + Sized,
400+
SP::Target: SignerProvider + Sized
401+
{
402+
let key = self.monitor_key(&monitor_name);
403+
let outpoint: OutPoint = monitor_name.try_into()?;
404+
match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(
405+
&mut self.read(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key)
406+
.map_err(|e| KVStoreChannelMonitorReaderError::StorageReadFailed(e, key.to_owned()))?,
407+
(&*entropy_source, &*signer_provider),
408+
) {
409+
Ok((blockhash, channel_monitor)) => {
410+
if channel_monitor.get_funding_txo().0.txid != outpoint.txid
411+
|| channel_monitor.get_funding_txo().0.index != outpoint.index
412+
{
413+
return Err(KVStoreChannelMonitorReaderError::MonitorDecodeFailed(
414+
DecodeError::InvalidValue,
415+
key,
416+
)
417+
.into());
418+
}
419+
Ok((blockhash, channel_monitor))
420+
}
421+
Err(e) => Err(KVStoreChannelMonitorReaderError::MonitorDecodeFailed(e, key).into()),
422+
}
423+
}
424+
425+
/// Deserialize a channel monitor update.
426+
fn deserialize_monitor_update(
427+
&self, monitor_name: &MonitorName, update_name: &UpdateName,
428+
) -> io::Result<ChannelMonitorUpdate>
429+
{
430+
let key = self.update_key(monitor_name, update_name);
431+
Ok(ChannelMonitorUpdate::read(&mut self
432+
.read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, &key)
433+
.map_err(|e| KVStoreChannelMonitorReaderError::StorageReadFailed(e, key.to_owned()))?)
434+
.map_err(|e| KVStoreChannelMonitorReaderError::UpdateDecodeFailed(e, key))?)
435+
}
436+
437+
/// Delete updates with an update_id lower than the given channel monitor.
438+
fn delete_stale_updates<ChannelSigner: WriteableEcdsaChannelSigner>(
439+
&self, channel_id: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
440+
) -> io::Result<()>
441+
{
442+
let monitor_name: MonitorName = channel_id.into();
443+
let update_names =
444+
self.list_update_names(&monitor_name)?;
445+
for update_name in update_names {
446+
let update =
447+
self.deserialize_monitor_update(&monitor_name, &update_name)?;
448+
if update.update_id != CLOSED_CHANNEL_UPDATE_ID
449+
&& update.update_id <= monitor.get_latest_update_id()
450+
{
451+
self.remove(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, &self.update_key(&monitor_name, &update_name))?;
452+
}
453+
}
454+
Ok(())
455+
}
456+
}
457+
458+
impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + KVStoreChannelMonitorReader<K>> Persist<ChannelSigner> for K {
459+
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
460+
// down once these start returning failure.
461+
// A PermanentFailure implies we should probably just shut down the node since we're
462+
// force-closing channels without even broadcasting!
463+
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus
464+
{
465+
let key = self.monitor_key(&funding_txo.into());
466+
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
467+
Ok(()) => {
468+
if let Err(_e) = self.delete_stale_updates(funding_txo, monitor) {
469+
// TODO(domz): what to do? seems like an error or panic is needed, but OTOH cleanup is technically optional
470+
//log_error!(self.logger, "error cleaning up channel monitor updates! {}", e);
471+
};
472+
chain::ChannelMonitorUpdateStatus::Completed
473+
},
474+
Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
475+
}
476+
}
477+
478+
fn update_persisted_channel(
479+
&self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>,
480+
monitor: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId,
481+
) -> chain::ChannelMonitorUpdateStatus {
482+
match update {
483+
Some(update) => {
484+
// This is an update to the monitor, which we persist to apply on restart.
485+
// IMPORTANT: update_id: MonitorUpdateId is not to be confused with ChannelMonitorUpdate.update_id.
486+
// The first is an opaque identifier for this call (used for calling back write completion). The second
487+
// is the channel update sequence number.
488+
let key = self.update_key(&funding_txo.into(), &update.update_id.into());
489+
match self.write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, &key, &update.encode()) {
490+
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
491+
Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
492+
}
493+
}
494+
// A new block. Now we need to persist the entire new monitor and discard the old
495+
// updates.
496+
None => self.persist_new_channel(funding_txo, monitor, update_id),
497+
}
498+
}
499+
500+
}

0 commit comments

Comments
 (0)