Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in ChannelManager's handle_error!() #568

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 54 additions & 107 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,21 +467,41 @@ pub struct ChannelDetails {
}

macro_rules! handle_error {
($self: ident, $internal: expr, $their_node_id: expr, $locked_channel_state: expr) => {
($self: ident, $internal: expr, $their_node_id: expr) => {
match $internal {
Ok(msg) => Ok(msg),
Err(MsgHandleErrInternal { err, shutdown_finish }) => {
#[cfg(debug_assertions)]
{
// In testing, ensure there are no deadlocks where the lock is already held upon
// entering the macro.
assert!($self.channel_state.try_lock().is_ok());
}

let mut msg_events = Vec::with_capacity(2);

if let Some((shutdown_res, update_option)) = shutdown_finish {
$self.finish_force_close_channel(shutdown_res);
if let Some(update) = update_option {
$locked_channel_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
}

log_error!($self, "{}", err.err);
if let msgs::ErrorAction::IgnoreError = err.action {
} else { $locked_channel_state.pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: $their_node_id, action: err.action.clone() }); }
} else {
msg_events.push(events::MessageSendEvent::HandleError {
node_id: $their_node_id,
action: err.action.clone()
});
}

if !msg_events.is_empty() {
$self.channel_state.lock().unwrap().pending_msg_events.append(&mut msg_events);
}

// Return error in case higher-API need one
Err(err)
},
Expand Down Expand Up @@ -1191,9 +1211,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan

let _ = self.total_consistency_lock.read().unwrap();

let mut channel_lock = self.channel_state.lock().unwrap();
let err: Result<(), _> = loop {

let mut channel_lock = self.channel_state.lock().unwrap();
let id = match channel_lock.short_to_id.get(&route.hops.first().unwrap().short_channel_id) {
None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!"}),
Some(id) => id.clone(),
Expand Down Expand Up @@ -1242,7 +1261,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
return Ok(());
};

match handle_error!(self, err, route.hops.first().unwrap().pubkey, channel_lock) {
match handle_error!(self, err, route.hops.first().unwrap().pubkey) {
Ok(_) => unreachable!(),
Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) }
}
Expand All @@ -1261,8 +1280,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
let _ = self.total_consistency_lock.read().unwrap();

let (mut chan, msg, chan_monitor) = {
let mut channel_state = self.channel_state.lock().unwrap();
let (res, chan) = match channel_state.by_id.remove(temporary_channel_id) {
let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) {
Some(mut chan) => {
(chan.get_outbound_funding_created(funding_txo)
.map_err(|e| if let ChannelError::Close(msg) = e {
Expand All @@ -1272,7 +1290,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
},
None => return
};
match handle_error!(self, res, chan.get_their_node_id(), channel_state) {
match handle_error!(self, res, chan.get_their_node_id()) {
Ok(funding_msg) => {
(chan, funding_msg.0, funding_msg.1)
},
Expand All @@ -1284,12 +1302,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
if let Err(e) = self.monitor.add_monitor(chan_monitor.get_funding_txo().unwrap(), chan_monitor) {
match e {
ChannelMonitorUpdateErr::PermanentFailure => {
{
let mut channel_state = self.channel_state.lock().unwrap();
match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id(), channel_state) {
Err(_) => { return; },
Ok(()) => unreachable!(),
}
match handle_error!(self, Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure", *temporary_channel_id, chan.force_shutdown(true), None)), chan.get_their_node_id()) {
Err(_) => { return; },
Ok(()) => unreachable!(),
}
},
ChannelMonitorUpdateErr::TemporaryFailure => {
Expand Down Expand Up @@ -1520,10 +1535,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
},
ChannelError::CloseDelayBroadcast { .. } => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
};
match handle_error!(self, err, their_node_id, channel_state) {
Ok(_) => unreachable!(),
Err(_) => { continue; },
}
handle_errors.push((their_node_id, err));
continue;
}
};
if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
Expand Down Expand Up @@ -1579,11 +1592,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
};
}

if handle_errors.len() > 0 {
let mut channel_state_lock = self.channel_state.lock().unwrap();
for (their_node_id, err) in handle_errors.drain(..) {
let _ = handle_error!(self, err, their_node_id, channel_state_lock);
}
for (their_node_id, err) in handle_errors.drain(..) {
let _ = handle_error!(self, err, their_node_id);
}

if new_events.is_empty() { return }
Expand Down Expand Up @@ -1835,7 +1845,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
return;
};

let _ = handle_error!(self, err, their_node_id, channel_state_lock);
mem::drop(channel_state_lock);
let _ = handle_error!(self, err, their_node_id);
}

/// Gets the node_id held by this ChannelManager
Expand Down Expand Up @@ -2579,9 +2590,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
#[doc(hidden)]
pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u64) -> Result<(), APIError> {
let _ = self.total_consistency_lock.read().unwrap();
let mut channel_state_lock = self.channel_state.lock().unwrap();
let their_node_id;
let err: Result<(), _> = loop {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_state_lock;

match channel_state.by_id.entry(channel_id) {
Expand Down Expand Up @@ -2620,7 +2631,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref> ChannelMan
return Ok(())
};

match handle_error!(self, err, their_node_id, channel_state_lock) {
match handle_error!(self, err, their_node_id) {
Ok(_) => unreachable!(),
Err(e) => { Err(APIError::APIMisuseError { err: e.err })}
}
Expand Down Expand Up @@ -2830,146 +2841,82 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
{
fn handle_open_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_open_channel(their_node_id, their_features, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_open_channel(their_node_id, their_features, msg), *their_node_id);
}

fn handle_accept_channel(&self, their_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::AcceptChannel) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_accept_channel(their_node_id, their_features, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_accept_channel(their_node_id, their_features, msg), *their_node_id);
}

fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_funding_created(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_funding_created(their_node_id, msg), *their_node_id);
}

fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_funding_signed(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_funding_signed(their_node_id, msg), *their_node_id);
}

fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_funding_locked(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_funding_locked(their_node_id, msg), *their_node_id);
}

fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_shutdown(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_shutdown(their_node_id, msg), *their_node_id);
}

fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_closing_signed(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_closing_signed(their_node_id, msg), *their_node_id);
}

fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_add_htlc(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_add_htlc(their_node_id, msg), *their_node_id);
}

fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_fulfill_htlc(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_fulfill_htlc(their_node_id, msg), *their_node_id);
}

fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_fail_htlc(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_fail_htlc(their_node_id, msg), *their_node_id);
}

fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_fail_malformed_htlc(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(their_node_id, msg), *their_node_id);
}

fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_commitment_signed(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_commitment_signed(their_node_id, msg), *their_node_id);
}

fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_revoke_and_ack(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_revoke_and_ack(their_node_id, msg), *their_node_id);
}

fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_update_fee(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_update_fee(their_node_id, msg), *their_node_id);
}

fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_announcement_signatures(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_announcement_signatures(their_node_id, msg), *their_node_id);
}

fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
let _ = self.total_consistency_lock.read().unwrap();
let res = self.internal_channel_reestablish(their_node_id, msg);
if res.is_err() {
let mut channel_state_lock = self.channel_state.lock().unwrap();
let _ = handle_error!(self, res, *their_node_id, channel_state_lock);
}
let _ = handle_error!(self, self.internal_channel_reestablish(their_node_id, msg), *their_node_id);
}

fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool) {
Expand Down
70 changes: 70 additions & 0 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2988,6 +2988,76 @@ fn test_commitment_revoked_fail_backward_exhaustive_b() {
do_test_commitment_revoked_fail_backward_exhaustive(true, false, true);
}

#[test]
fn fail_backward_pending_htlc_upon_channel_failure() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::supported(), InitFeatures::supported());

// Alice -> Bob: Route a payment but without Bob sending revoke_and_ack.
{
let (_, payment_hash) = get_payment_preimage_hash!(nodes[0]);
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
nodes[0].node.send_payment(route, payment_hash).unwrap();
check_added_monitors!(nodes[0], 1);

let payment_event = {
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 1);
SendEvent::from_event(events.remove(0))
};
assert_eq!(payment_event.node_id, nodes[1].node.get_our_node_id());
assert_eq!(payment_event.msgs.len(), 1);
}

// Alice -> Bob: Route another payment but now Alice waits for Bob's earlier revoke_and_ack.
let (_, failed_payment_hash) = get_payment_preimage_hash!(nodes[0]);
{
let route = nodes[0].router.get_route(&nodes[1].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
nodes[0].node.send_payment(route, failed_payment_hash).unwrap();
check_added_monitors!(nodes[0], 0);

assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
}

// Alice <- Bob: Send a malformed update_add_htlc so Alice fails the channel.
{
let route = nodes[1].router.get_route(&nodes[0].node.get_our_node_id(), None, &Vec::new(), 50_000, TEST_FINAL_CLTV).unwrap();
let (_, payment_hash) = get_payment_preimage_hash!(nodes[1]);

let secp_ctx = Secp256k1::new();
let session_priv = {
let mut session_key = [0; 32];
let mut rng = thread_rng();
rng.fill_bytes(&mut session_key);
SecretKey::from_slice(&session_key).expect("RNG is bad!")
};

let current_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1;
let (onion_payloads, _amount_msat, cltv_expiry) = onion_utils::build_onion_payloads(&route, current_height).unwrap();
let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route, &session_priv).unwrap();
let onion_routing_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &payment_hash);

// Send a 0-msat update_add_htlc to fail the channel.
let update_add_htlc = msgs::UpdateAddHTLC {
channel_id: chan.2,
htlc_id: 0,
amount_msat: 0,
payment_hash,
cltv_expiry,
onion_routing_packet,
};
nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &update_add_htlc);
}

// Check that Alice fails backward the pending HTLC from the second payment.
expect_payment_failed!(nodes[0], failed_payment_hash, true);
check_closed_broadcast!(nodes[0], true);
check_added_monitors!(nodes[0], 1);
}

#[test]
fn test_htlc_ignore_latest_remote_commitment() {
// Test that HTLC transactions spending the latest remote commitment transaction are simply
Expand Down