diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 14f2db55ce6..9c41259926d 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -1682,6 +1682,10 @@ pub(super) trait InteractivelyFunded where SP::Target: SignerProvider fn interactive_tx_constructor_mut(&mut self) -> &mut Option; + fn interactive_tx_signing_session_mut(&mut self) -> &mut Option; + + fn interactive_tx_signing_session_with_context_mut(&mut self) -> (&mut Option, &mut ChannelContext); + fn dual_funding_context(&self) -> &DualFundingChannelContext; fn tx_add_input(&mut self, msg: &msgs::TxAddInput) -> InteractiveTxMessageSendResult { @@ -1755,13 +1759,20 @@ pub(super) trait InteractivelyFunded where SP::Target: SignerProvider } fn funding_tx_constructed( - &mut self, signing_session: &mut InteractiveTxSigningSession, logger: &L - ) -> Result<(msgs::CommitmentSigned, Option), ChannelError> + &mut self, logger: &L + ) -> Result<(Option, Option), ChannelError> where L::Target: Logger { let our_funding_satoshis = self.dual_funding_context().our_funding_satoshis; - let context = self.context_mut(); + let (signing_session, context) = match self.interactive_tx_signing_session_with_context_mut() { + (Some(signing_session), context) => (signing_session, context), + (None, _) => return Err(ChannelError::Close( + ( + "No signing session available for commitment signing".to_owned(), + ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) }, + ))), + }; let mut output_index = None; let expected_spk = context.get_funding_redeemscript().to_p2wsh(); @@ -1791,13 +1802,13 @@ pub(super) trait InteractivelyFunded where SP::Target: SignerProvider let commitment_signed = context.get_initial_commitment_signed(logger); let commitment_signed = match commitment_signed { - Ok(commitment_signed) => { + Some(commitment_signed) => { context.funding_transaction = Some(signing_session.unsigned_tx.build_unsigned_tx()); commitment_signed }, - Err(err) => { + None => { context.channel_transaction_parameters.funding_outpoint = None; - return Err(ChannelError::Close((err.to_string(), ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) }))) + return Ok((None, None)); }, }; @@ -1832,7 +1843,7 @@ pub(super) trait InteractivelyFunded where SP::Target: SignerProvider // Clear the interactive transaction constructor self.interactive_tx_constructor_mut().take(); - Ok((commitment_signed, funding_ready_for_sig_event)) + Ok((Some(commitment_signed), funding_ready_for_sig_event)) } } @@ -1849,6 +1860,12 @@ impl InteractivelyFunded for OutboundV2Channel where SP::Targ fn interactive_tx_constructor_mut(&mut self) -> &mut Option { &mut self.interactive_tx_constructor } + fn interactive_tx_signing_session_mut(&mut self) -> &mut Option { + &mut self.signing_session + } + fn interactive_tx_signing_session_with_context_mut(&mut self) -> (&mut Option, &mut ChannelContext) { + (&mut self.signing_session, &mut self.context) + } } impl InteractivelyFunded for InboundV2Channel where SP::Target: SignerProvider { @@ -1864,6 +1881,12 @@ impl InteractivelyFunded for InboundV2Channel where SP::Targe fn interactive_tx_constructor_mut(&mut self) -> &mut Option { &mut self.interactive_tx_constructor } + fn interactive_tx_signing_session_mut(&mut self) -> &mut Option { + &mut self.signing_session + } + fn interactive_tx_signing_session_with_context_mut(&mut self) -> (&mut Option, &mut ChannelContext) { + (&mut self.signing_session, &mut self.context) + } } impl ChannelContext where SP::Target: SignerProvider { @@ -4013,7 +4036,7 @@ impl ChannelContext where SP::Target: SignerProvider { fn get_initial_counterparty_commitment_signature( &self, logger: &L - ) -> Result + ) -> Option where SP::Target: SignerProvider, L::Target: Logger @@ -4026,11 +4049,7 @@ impl ChannelContext where SP::Target: SignerProvider { ChannelSignerType::Ecdsa(ref ecdsa) => { ecdsa.sign_counterparty_commitment(&counterparty_initial_commitment_tx, Vec::new(), Vec::new(), &self.secp_ctx) .map(|(signature, _)| signature) - .map_err(|_| ChannelError::Close( - ( - "Failed to get signatures for new commitment_signed".to_owned(), - ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) }, - ))) + .ok() }, // TODO (taproot|arik) #[cfg(taproot)] @@ -4038,9 +4057,7 @@ impl ChannelContext where SP::Target: SignerProvider { } } - fn get_initial_commitment_signed( - &mut self, logger: &L - ) -> Result + fn get_initial_commitment_signed(&mut self, logger: &L) -> Option where SP::Target: SignerProvider, L::Target: Logger @@ -4052,31 +4069,42 @@ impl ChannelContext where SP::Target: SignerProvider { } self.assert_no_commitment_advancement("initial commitment_signed"); - let signature = match self.get_initial_counterparty_commitment_signature(logger) { - Ok(res) => res, - Err(e) => { - log_error!(logger, "Got bad signatures: {:?}!", e); - return Err(e); - } - }; + match self.get_initial_counterparty_commitment_signature(logger) { + Some(signature) => { + if self.signer_pending_funding { + log_trace!(logger, "Counterparty commitment signature ready for initial commitment_signed message: clearing signer_pending_funding"); + self.signer_pending_funding = false; + } - log_info!(logger, "Generated commitment_signed for peer for channel {}", &self.channel_id()); + log_info!(logger, "Generated commitment_signed for peer for channel {}", &self.channel_id); - Ok(msgs::CommitmentSigned { - channel_id: self.channel_id, - htlc_signatures: vec![], - signature, - batch: None, - #[cfg(taproot)] - partial_signature_with_nonce: None, - }) + Some(msgs::CommitmentSigned { + channel_id: self.channel_id, + htlc_signatures: vec![], + signature, + batch: None, + #[cfg(taproot)] + partial_signature_with_nonce: None, + }) + }, + None => { + #[cfg(not(async_signing))] { + panic!("Failed to get signature for initial commitment_signed"); + } + #[cfg(async_signing)] { + log_trace!(logger, "Counterparty commitment signature not available for initial commitment_signed message; setting signer_pending_funding"); + self.signer_pending_funding = true; + None + } + }, + } } #[cfg(test)] pub fn get_initial_counterparty_commitment_signature_for_test( &mut self, logger: &L, channel_transaction_parameters: ChannelTransactionParameters, counterparty_cur_commitment_point_override: PublicKey, - ) -> Result + ) -> Option where SP::Target: SignerProvider, L::Target: Logger @@ -8723,6 +8751,7 @@ pub(super) struct OutboundV2Channel where SP::Target: SignerProvider pub dual_funding_context: DualFundingChannelContext, /// The current interactive transaction construction session under negotiation. interactive_tx_constructor: Option, + signing_session: Option, } impl OutboundV2Channel where SP::Target: SignerProvider { @@ -8782,6 +8811,7 @@ impl OutboundV2Channel where SP::Target: SignerProvider { our_funding_inputs: funding_inputs, }, interactive_tx_constructor: None, + signing_session: None, }; Ok(chan) } @@ -8849,13 +8879,12 @@ impl OutboundV2Channel where SP::Target: SignerProvider { } } - pub fn into_channel(self, signing_session: InteractiveTxSigningSession) -> Result, ChannelError>{ - let channel = Channel { + pub fn into_channel(self) -> Channel { + debug_assert!(self.signing_session.is_some()); + Channel { context: self.context, - interactive_tx_signing_session: Some(signing_session), - }; - - Ok(channel) + interactive_tx_signing_session: self.signing_session, + } } } @@ -8866,6 +8895,7 @@ pub(super) struct InboundV2Channel where SP::Target: SignerProvider { pub dual_funding_context: DualFundingChannelContext, /// The current interactive transaction construction session under negotiation. interactive_tx_constructor: Option, + signing_session: Option, } impl InboundV2Channel where SP::Target: SignerProvider { @@ -8968,6 +8998,7 @@ impl InboundV2Channel where SP::Target: SignerProvider { dual_funding_context, interactive_tx_constructor, unfunded_context: UnfundedChannelContext { unfunded_channel_age_ticks: 0 }, + signing_session: None, }) } @@ -9043,13 +9074,12 @@ impl InboundV2Channel where SP::Target: SignerProvider { self.generate_accept_channel_v2_message() } - pub fn into_channel(self, signing_session: InteractiveTxSigningSession) -> Result, ChannelError>{ - let channel = Channel { + pub fn into_channel(self) -> Channel { + debug_assert!(self.signing_session.is_some()); + Channel { context: self.context, - interactive_tx_signing_session: Some(signing_session), - }; - - Ok(channel) + interactive_tx_signing_session: self.signing_session, + } } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 86faeac9a68..aaa239aec8e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3145,7 +3145,7 @@ macro_rules! emit_channel_ready_event { macro_rules! handle_monitor_update_completion { ($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { { let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); - let mut updates = $chan.monitor_updating_restored(&&logger, + let updates = $chan.monitor_updating_restored(&&logger, &$self.node_signer, $self.chain_hash, &$self.default_configuration, $self.best_block.read().unwrap().height); let counterparty_node_id = $chan.context.get_counterparty_node_id(); @@ -3233,7 +3233,7 @@ macro_rules! handle_monitor_update_completion { $self.push_decode_update_add_htlcs(decode); } $self.finalize_claims(updates.finalized_claimed_htlcs); - for failure in updates.failed_htlcs.drain(..) { + for failure in updates.failed_htlcs { let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; $self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver); } @@ -3775,7 +3775,7 @@ where } } - for htlc_source in failed_htlcs.drain(..) { + for htlc_source in failed_htlcs { let reason = HTLCFailReason::from_failure_code(0x4000 | 8); let receiver = HTLCDestination::NextHopChannel { node_id: Some(*counterparty_node_id), channel_id: *channel_id }; self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); @@ -3911,7 +3911,7 @@ where /// the channel-closing action, /// (b) this needs to be called without holding any locks (except /// [`ChannelManager::total_consistency_lock`]. - fn finish_close_channel(&self, mut shutdown_res: ShutdownResult) { + fn finish_close_channel(&self, shutdown_res: ShutdownResult) { debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); #[cfg(debug_assertions)] for (_, peer) in self.per_peer_state.read().unwrap().iter() { @@ -3924,7 +3924,7 @@ where log_debug!(logger, "Finishing closure of channel due to {} with {} HTLCs to fail", shutdown_res.closure_reason, shutdown_res.dropped_outbound_htlcs.len()); - for htlc_source in shutdown_res.dropped_outbound_htlcs.drain(..) { + for htlc_source in shutdown_res.dropped_outbound_htlcs { let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source; let reason = HTLCFailReason::from_failure_code(0x4000 | 8); let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; @@ -3985,7 +3985,7 @@ where }, None)); } } - for shutdown_result in shutdown_results.drain(..) { + for shutdown_result in shutdown_results { self.finish_close_channel(shutdown_result); } } @@ -5301,7 +5301,7 @@ where } } mem::drop(funding_batch_states); - for shutdown_result in shutdown_results.drain(..) { + for shutdown_result in shutdown_results { self.finish_close_channel(shutdown_result); } } @@ -5657,10 +5657,10 @@ where // proposed to as a batch. let pending_forwards = ( incoming_scid, Some(incoming_counterparty_node_id), incoming_funding_txo, - incoming_channel_id, incoming_user_channel_id, htlc_forwards.drain(..).collect() + incoming_channel_id, incoming_user_channel_id, htlc_forwards, ); self.forward_htlcs_without_forward_event(&mut [pending_forwards]); - for (htlc_fail, htlc_destination) in htlc_fails.drain(..) { + for (htlc_fail, htlc_destination) in htlc_fails { let failure = match htlc_fail { HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC { htlc_id: fail_htlc.htlc_id, @@ -5697,7 +5697,7 @@ where let mut forward_htlcs = new_hash_map(); mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap()); - for (short_chan_id, mut pending_forwards) in forward_htlcs { + for (short_chan_id, pending_forwards) in forward_htlcs { if short_chan_id != 0 { let mut forwarding_counterparty = None; macro_rules! forwarding_channel_not_found { @@ -5814,7 +5814,7 @@ where let (counterparty_node_id, forward_chan_id) = match chan_info_opt { Some((cp_id, chan_id)) => (cp_id, chan_id), None => { - forwarding_channel_not_found!(pending_forwards.drain(..)); + forwarding_channel_not_found!(pending_forwards); continue; } }; @@ -5822,13 +5822,13 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); if peer_state_mutex_opt.is_none() { - forwarding_channel_not_found!(pending_forwards.drain(..)); + forwarding_channel_not_found!(pending_forwards); continue; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - let mut draining_pending_forwards = pending_forwards.drain(..); - while let Some(forward_info) = draining_pending_forwards.next() { + let mut pending_forwards_iter = pending_forwards.into_iter(); + while let Some(forward_info) = pending_forwards_iter.next() { let queue_fail_htlc_res = match forward_info { HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint, @@ -5886,7 +5886,7 @@ where if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) { chan } else { - forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards)); + forwarding_channel_not_found!(core::iter::once(forward_info).chain(pending_forwards_iter)); break; } } @@ -5918,7 +5918,7 @@ where HTLCDestination::NextHopChannel { node_id: Some(chan.context.get_counterparty_node_id()), channel_id: forward_chan_id } )); } else { - forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards)); + forwarding_channel_not_found!(core::iter::once(forward_info).chain(pending_forwards_iter)); break; } } @@ -5933,7 +5933,7 @@ where log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id)) } else { - forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards)); + forwarding_channel_not_found!(core::iter::once(forward_info).chain(pending_forwards_iter)); break; } }, @@ -5946,7 +5946,7 @@ where ); Some((res, htlc_id)) } else { - forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards)); + forwarding_channel_not_found!(core::iter::once(forward_info).chain(pending_forwards_iter)); break; } }, @@ -5969,7 +5969,7 @@ where } } } else { - 'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) { + 'next_forwardable_htlc: for forward_info in pending_forwards { match forward_info { HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint, @@ -6221,7 +6221,7 @@ where || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.pending_events, &self.logger, |args| self.send_payment_along_path(args)); - for (htlc_source, payment_hash, failure_reason, destination) in failed_forwards.drain(..) { + for (htlc_source, payment_hash, failure_reason, destination) in failed_forwards { self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination); } self.forward_htlcs(&mut phantom_receives); @@ -6251,7 +6251,7 @@ where return NotifyOption::SkipPersistNoEvents; } - for event in background_events.drain(..) { + for event in background_events { match event { BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, _channel_id, update)) => { // The channel has already been closed, so no use bothering to care about the @@ -6579,14 +6579,14 @@ where true }); - for htlc_source in timed_out_mpp_htlcs.drain(..) { + for htlc_source in timed_out_mpp_htlcs { let source = HTLCSource::PreviousHopData(htlc_source.0.clone()); let reason = HTLCFailReason::from_failure_code(23); let receiver = HTLCDestination::FailedPayment { payment_hash: htlc_source.1 }; self.fail_htlc_backwards_internal(&source, &htlc_source.1, &reason, receiver); } - for (err, counterparty_node_id) in handle_errors.drain(..) { + for (err, counterparty_node_id) in handle_errors { let _ = handle_error!(self, err, counterparty_node_id); } @@ -6724,7 +6724,7 @@ where // failed backwards or, if they were one of our outgoing HTLCs, then their failure needs to // be surfaced to the user. fn fail_holding_cell_htlcs( - &self, mut htlcs_to_fail: Vec<(HTLCSource, PaymentHash)>, channel_id: ChannelId, + &self, htlcs_to_fail: Vec<(HTLCSource, PaymentHash)>, channel_id: ChannelId, counterparty_node_id: &PublicKey ) { let (failure_code, onion_failure_data) = { @@ -6747,7 +6747,7 @@ where } else { (0x4000|10, Vec::new()) } }; - for (htlc_src, payment_hash) in htlcs_to_fail.drain(..) { + for (htlc_src, payment_hash) in htlcs_to_fail { let reason = HTLCFailReason::reason(failure_code, onion_failure_data.clone()); let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id.clone()), channel_id }; self.fail_htlc_backwards_internal(&htlc_src, &payment_hash, &reason, receiver); @@ -6915,7 +6915,6 @@ where let mut prev_total_msat = None; let mut expected_amt_msat = None; let mut valid_mpp = true; - let mut errs = Vec::new(); let per_peer_state = self.per_peer_state.read().unwrap(); for htlc in sources.iter() { if prev_total_msat.is_some() && prev_total_msat != Some(htlc.total_msat) { @@ -7002,12 +7001,6 @@ where } self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); } - - // Now we can handle any errors which were generated. - for (counterparty_node_id, err) in errs.drain(..) { - let res: Result<(), _> = Err(err); - let _ = handle_error!(self, res, counterparty_node_id); - } } fn claim_funds_from_hop< @@ -8275,7 +8268,78 @@ where } }) } +} +macro_rules! finish_tx_complete { + ($self: ident, $counterparty_node_id: ident, $peer_state: ident, $chan_phase_entry: ident) => { + loop { + let result = match $chan_phase_entry.get_mut() { + ChannelPhase::UnfundedOutboundV2(chan) => { + chan.funding_tx_constructed(&$self.logger) + }, + ChannelPhase::UnfundedInboundV2(chan) => { + chan.funding_tx_constructed(&$self.logger) + }, + _ => unreachable!(), + }; + let (commitment_signed_opt, funding_ready_for_sig_event_opt) = match result { + Ok((commitment_signed_opt, funding_ready_for_sig_event_opt)) => { + (commitment_signed_opt, funding_ready_for_sig_event_opt) + }, + Err(e) => break Err(e), + }; + + // Check if the signer returned a result. + // + // TODO: This can be removed once ChannelPhase is refactored into Channel as the phase + // transition will happen internally. + if commitment_signed_opt.is_none() { + break Ok(()); + } + + let (channel_id, channel_phase) = $chan_phase_entry.remove_entry(); + let channel = match channel_phase { + ChannelPhase::UnfundedOutboundV2(chan) => chan.into_channel(), + ChannelPhase::UnfundedInboundV2(chan) => chan.into_channel(), + _ => unreachable!(), + }; + $peer_state.channel_by_id.insert(channel_id, ChannelPhase::Funded(channel)); + + if let Some(funding_ready_for_sig_event) = funding_ready_for_sig_event_opt { + let mut pending_events = $self.pending_events.lock().unwrap(); + pending_events.push_back((funding_ready_for_sig_event, None)); + } + + if let Some(commitment_signed) = commitment_signed_opt { + $peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { + node_id: $counterparty_node_id, + updates: CommitmentUpdate { + commitment_signed, + update_add_htlcs: vec![], + update_fulfill_htlcs: vec![], + update_fail_htlcs: vec![], + update_fail_malformed_htlcs: vec![], + update_fee: None, + }, + }); + } + break Ok(()); + } + } +} + +impl ChannelManager +where + M::Target: chain::Watch<::EcdsaSigner>, + T::Target: BroadcasterInterface, + ES::Target: EntropySource, + NS::Target: NodeSigner, + SP::Target: SignerProvider, + F::Target: FeeEstimator, + R::Target: Router, + MR::Target: MessageRouter, + L::Target: Logger, +{ fn internal_tx_complete(&self, counterparty_node_id: PublicKey, msg: &msgs::TxComplete) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(&counterparty_node_id) @@ -8304,45 +8368,23 @@ where if let Some(msg_send_event) = msg_send_event_opt { peer_state.pending_msg_events.push(msg_send_event); }; - if let Some(mut signing_session) = signing_session_opt { - let (commitment_signed, funding_ready_for_sig_event_opt) = match chan_phase_entry.get_mut() { + if let Some(signing_session) = signing_session_opt { + match chan_phase_entry.get_mut() { ChannelPhase::UnfundedOutboundV2(chan) => { - chan.funding_tx_constructed(&mut signing_session, &self.logger) + *chan.interactive_tx_signing_session_mut() = Some(signing_session); }, ChannelPhase::UnfundedInboundV2(chan) => { - chan.funding_tx_constructed(&mut signing_session, &self.logger) + *chan.interactive_tx_signing_session_mut() = Some(signing_session); }, - _ => Err(ChannelError::Warn( - "Got a tx_complete message with no interactive transaction construction expected or in-progress" - .into())), - }.map_err(|err| MsgHandleErrInternal::send_err_msg_no_close(format!("{}", err), msg.channel_id))?; - let (channel_id, channel_phase) = chan_phase_entry.remove_entry(); - let channel = match channel_phase { - ChannelPhase::UnfundedOutboundV2(chan) => chan.into_channel(signing_session), - ChannelPhase::UnfundedInboundV2(chan) => chan.into_channel(signing_session), _ => { - debug_assert!(false); // It cannot be another variant as we are in the `Ok` branch of the above match. - Err(ChannelError::Warn( - "Got a tx_complete message with no interactive transaction construction expected or in-progress" - .into())) + let err = ChannelError::Warn( + "Got a tx_complete message with no interactive transaction construction expected or in-progress".into() + ); + return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("{}", err), msg.channel_id)); }, - }.map_err(|err| MsgHandleErrInternal::send_err_msg_no_close(format!("{}", err), msg.channel_id))?; - peer_state.channel_by_id.insert(channel_id, ChannelPhase::Funded(channel)); - if let Some(funding_ready_for_sig_event) = funding_ready_for_sig_event_opt { - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push_back((funding_ready_for_sig_event, None)); } - peer_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id: counterparty_node_id, - updates: CommitmentUpdate { - commitment_signed, - update_add_htlcs: vec![], - update_fulfill_htlcs: vec![], - update_fail_htlcs: vec![], - update_fail_malformed_htlcs: vec![], - update_fee: None, - }, - }); + finish_tx_complete!(self, counterparty_node_id, peer_state, chan_phase_entry) + .map_err(|err| MsgHandleErrInternal::send_err_msg_no_close(format!("{}", err), msg.channel_id))?; } Ok(()) }, @@ -8573,7 +8615,7 @@ where return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } } - for htlc_source in dropped_htlcs.drain(..) { + for htlc_source in dropped_htlcs { let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id.clone()), channel_id: msg.channel_id }; let reason = HTLCFailReason::from_failure_code(0x4000 | 8); self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); @@ -8980,7 +9022,7 @@ where } } - for (htlc_source, payment_hash, failure_reason, destination) in failed_intercept_forwards.drain(..) { + for (htlc_source, payment_hash, failure_reason, destination) in failed_intercept_forwards { push_forward_event |= self.fail_htlc_backwards_internal_without_forward_event(&htlc_source, &payment_hash, &failure_reason, destination); } @@ -9302,10 +9344,10 @@ where debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock let mut failed_channels = Vec::new(); - let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events(); + let pending_monitor_events = self.chain_monitor.release_pending_monitor_events(); let has_pending_monitor_events = !pending_monitor_events.is_empty(); - for (funding_outpoint, channel_id, mut monitor_events, counterparty_node_id) in pending_monitor_events.drain(..) { - for monitor_event in monitor_events.drain(..) { + for (funding_outpoint, channel_id, monitor_events, counterparty_node_id) in pending_monitor_events { + for monitor_event in monitor_events { match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { let logger = WithContext::from(&self.logger, counterparty_node_id, Some(channel_id), Some(htlc_update.payment_hash)); @@ -9369,7 +9411,7 @@ where } } - for failure in failed_channels.drain(..) { + for failure in failed_channels { self.finish_close_channel(failure); } @@ -9427,7 +9469,7 @@ where } let has_update = has_monitor_update || !failed_htlcs.is_empty(); - for (failures, channel_id, counterparty_node_id) in failed_htlcs.drain(..) { + for (failures, channel_id, counterparty_node_id) in failed_htlcs { self.fail_holding_cell_htlcs(failures, channel_id, &counterparty_node_id); } @@ -9543,9 +9585,48 @@ where }); } drop(per_peer_state); - for shutdown_result in shutdown_results.drain(..) { + for shutdown_result in shutdown_results { self.finish_close_channel(shutdown_result); } + + // Finish any tx_complete handling waiting on async signing. + // + // TODO: Move this into the earlier channel iteration to avoid duplication and the Vec + // allocation once ChannelPhase is refactored into Channel. This can't be avoided with the + // current data model because tx_complete handling requires removing the entry from the + // channel_by_id map and re-inserting it, which can't be done while iterating over the map. + let channels = match channel_opt { + Some((counterparty_node_id, channel_id)) => vec![(counterparty_node_id, channel_id)], + None => { + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut channels = Vec::with_capacity(per_peer_state.len()); + for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + for (channel_id, channel) in peer_state.channel_by_id.iter() { + if let ChannelPhase::UnfundedInboundV2(_) | ChannelPhase::UnfundedOutboundV2(_) = channel { + channels.push((*counterparty_node_id, *channel_id)); + } + } + } + channels + } + }; + for (counterparty_node_id, channel_id) in channels { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) { + match chan_phase_entry.get_mut() { + ChannelPhase::UnfundedInboundV2(_) | ChannelPhase::UnfundedOutboundV2(_) => { + let _ = finish_tx_complete!(self, counterparty_node_id, peer_state, chan_phase_entry); + }, + _ => {}, + } + } + } + } } /// Check whether any channels have finished removing all pending updates after a shutdown @@ -9608,11 +9689,11 @@ where } } - for (counterparty_node_id, err) in handle_errors.drain(..) { + for (counterparty_node_id, err) in handle_errors { let _ = handle_error!(self, err, counterparty_node_id); } - for shutdown_result in shutdown_results.drain(..) { + for shutdown_result in shutdown_results { self.finish_close_channel(shutdown_result); } @@ -9622,8 +9703,8 @@ where /// Handle a list of channel failures during a block_connected or block_disconnected call, /// pushing the channel monitor update (if any) to the background events queue and removing the /// Channel object. - fn handle_init_event_channel_failures(&self, mut failed_channels: Vec) { - for mut failure in failed_channels.drain(..) { + fn handle_init_event_channel_failures(&self, failed_channels: Vec) { + for mut failure in failed_channels { // Either a commitment transactions has been confirmed on-chain or // Channel::block_disconnected detected that the funding transaction has been // reorganized out of the main chain. @@ -10971,8 +11052,8 @@ where ChannelPhase::UnfundedOutboundV2(_) | ChannelPhase::UnfundedInboundV2(_) => true, ChannelPhase::Funded(channel) => { let res = f(channel); - if let Ok((channel_ready_opt, mut timed_out_pending_htlcs, announcement_sigs)) = res { - for (source, payment_hash) in timed_out_pending_htlcs.drain(..) { + if let Ok((channel_ready_opt, timed_out_pending_htlcs, announcement_sigs)) = res { + for (source, payment_hash) in timed_out_pending_htlcs { let (failure_code, data) = self.get_htlc_inbound_temp_fail_err_and_data(0x1000|14 /* expiry_too_soon */, &channel); timed_out_htlcs.push((source, payment_hash, HTLCFailReason::reason(failure_code, data), HTLCDestination::NextHopChannel { node_id: Some(channel.context.get_counterparty_node_id()), channel_id: channel.context.channel_id() })); @@ -11138,7 +11219,7 @@ where self.handle_init_event_channel_failures(failed_channels); - for (source, payment_hash, reason, destination) in timed_out_htlcs.drain(..) { + for (source, payment_hash, reason, destination) in timed_out_htlcs { self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, destination); } } @@ -11565,7 +11646,7 @@ where } mem::drop(per_peer_state); - for failure in failed_channels.drain(..) { + for failure in failed_channels { self.finish_close_channel(failure); } } @@ -13078,13 +13159,13 @@ where entropy_source: ES, node_signer: NS, signer_provider: SP, fee_estimator: F, chain_monitor: M, tx_broadcaster: T, router: R, message_router: MR, logger: L, default_config: UserConfig, - mut channel_monitors: Vec<&'a ChannelMonitor<::EcdsaSigner>>, + channel_monitors: Vec<&'a ChannelMonitor<::EcdsaSigner>>, ) -> Self { Self { entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster, router, message_router, logger, default_config, channel_monitors: hash_map_from_iter( - channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) }) + channel_monitors.into_iter().map(|monitor| { (monitor.get_funding_txo().0, monitor) }) ), } } @@ -13471,7 +13552,7 @@ where pending_outbound_payments = Some(pending_outbound_payments_compat); } else if pending_outbound_payments.is_none() { let mut outbounds = new_hash_map(); - for (id, session_privs) in pending_outbound_payments_no_retry.unwrap().drain() { + for (id, session_privs) in pending_outbound_payments_no_retry.unwrap() { outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs }); } pending_outbound_payments = Some(outbounds); @@ -13867,7 +13948,7 @@ where } else { // LDK versions prior to 0.0.107 did not write a `pending_htlc_purposes`, but do // include a `_legacy_hop_data` in the `OnionPayload`. - for (payment_hash, htlcs) in claimable_htlcs_list.drain(..) { + for (payment_hash, htlcs) in claimable_htlcs_list { if htlcs.is_empty() { return Err(DecodeError::InvalidValue); } @@ -14228,7 +14309,7 @@ where } } - for htlc_source in failed_htlcs.drain(..) { + for htlc_source in failed_htlcs { let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source; let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; let reason = HTLCFailReason::from_failure_code(0x4000 | 8); @@ -14377,17 +14458,17 @@ mod tests { nodes[0].node.test_send_payment_along_path(&mpp_route.paths[0], &our_payment_hash, RecipientOnionFields::secret_only(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[0]).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_msg_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); - pass_along_path(&nodes[0], &[&nodes[1]], 200_000, our_payment_hash, Some(payment_secret), events.drain(..).next().unwrap(), false, None); + pass_along_path(&nodes[0], &[&nodes[1]], 200_000, our_payment_hash, Some(payment_secret), events.into_iter().next().unwrap(), false, None); // Next, send a keysend payment with the same payment_hash and make sure it fails. nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_msg_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); - let ev = events.drain(..).next().unwrap(); + let ev = events.into_iter().next().unwrap(); let payment_event = SendEvent::from_event(ev); nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &payment_event.msgs[0]); check_added_monitors!(nodes[1], 0); @@ -14409,9 +14490,9 @@ mod tests { nodes[0].node.test_send_payment_along_path(&mpp_route.paths[1], &our_payment_hash, RecipientOnionFields::secret_only(payment_secret), 200_000, cur_height, payment_id, &None, session_privs[1]).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_msg_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); - pass_along_path(&nodes[0], &[&nodes[1]], 200_000, our_payment_hash, Some(payment_secret), events.drain(..).next().unwrap(), true, None); + pass_along_path(&nodes[0], &[&nodes[1]], 200_000, our_payment_hash, Some(payment_secret), events.into_iter().next().unwrap(), true, None); // Claim the full MPP payment. Note that we can't use a test utility like // claim_funds_along_route because the ordering of the messages causes the second half of the @@ -14512,9 +14593,9 @@ mod tests { nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_msg_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); - let ev = events.drain(..).next().unwrap(); + let ev = events.into_iter().next().unwrap(); let payment_event = SendEvent::from_event(ev); nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &payment_event.msgs[0]); check_added_monitors!(nodes[1], 0); @@ -14557,9 +14638,9 @@ mod tests { nodes[0].node.send_payment_with_route(route.clone(), payment_hash, RecipientOnionFields::secret_only(payment_secret), PaymentId(payment_hash.0)).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_msg_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); - let ev = events.drain(..).next().unwrap(); + let ev = events.into_iter().next().unwrap(); let payment_event = SendEvent::from_event(ev); nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &payment_event.msgs[0]); check_added_monitors!(nodes[1], 0); @@ -14604,9 +14685,9 @@ mod tests { nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), payment_id_2).unwrap(); check_added_monitors!(nodes[0], 1); - let mut events = nodes[0].node.get_and_clear_pending_msg_events(); + let events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 1); - let ev = events.drain(..).next().unwrap(); + let ev = events.into_iter().next().unwrap(); let payment_event = SendEvent::from_event(ev); nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &payment_event.msgs[0]); check_added_monitors!(nodes[1], 0); diff --git a/lightning/src/ln/dual_funding_tests.rs b/lightning/src/ln/dual_funding_tests.rs index 7742931cd9f..16eea3d812b 100644 --- a/lightning/src/ln/dual_funding_tests.rs +++ b/lightning/src/ln/dual_funding_tests.rs @@ -28,6 +28,8 @@ use crate::ln::types::ChannelId; use crate::prelude::*; use crate::sign::{ChannelSigner as _, P2WPKH_WITNESS_WEIGHT}; use crate::util::ser::TransactionU16LenLimited; +#[cfg(async_signing)] +use crate::util::test_channel_signer::SignerOp; use crate::util::test_utils; // Dual-funding: V2 Channel Establishment Tests @@ -129,9 +131,29 @@ fn do_test_v2_channel_establishment( let _tx_complete_msg = get_event_msg!(nodes[1], MessageSendEvent::SendTxComplete, nodes[0].node.get_our_node_id()); - let tx_complete_msg = TxComplete { channel_id }; + #[cfg(async_signing)] + { + nodes[1].disable_channel_signer_op( + &nodes[0].node.get_our_node_id(), + &channel_id, + SignerOp::SignCounterpartyCommitment, + ); + } + let tx_complete_msg = TxComplete { channel_id }; nodes[1].node.handle_tx_complete(nodes[0].node.get_our_node_id(), &tx_complete_msg); + + #[cfg(async_signing)] + { + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + nodes[1].enable_channel_signer_op( + &nodes[0].node.get_our_node_id(), + &channel_id, + SignerOp::SignCounterpartyCommitment, + ); + nodes[1].node.signer_unblocked(Some((nodes[0].node.get_our_node_id(), channel_id))); + } + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(msg_events.len(), 1); let _msg_commitment_signed_from_1 = match msg_events[0] { diff --git a/lightning/src/ln/interactivetxs.rs b/lightning/src/ln/interactivetxs.rs index 2b72133ec09..d9de30fb019 100644 --- a/lightning/src/ln/interactivetxs.rs +++ b/lightning/src/ln/interactivetxs.rs @@ -327,7 +327,7 @@ impl InteractiveTxSigningSession { if self.remote_inputs_count() != tx_signatures.witnesses.len() { return Err(()); } - self.unsigned_tx.add_remote_witnesses(tx_signatures.witnesses.clone()); + self.unsigned_tx.add_remote_witnesses(tx_signatures.witnesses); self.counterparty_sent_tx_signatures = true; let holder_tx_signatures = if !self.holder_sends_tx_signatures_first { @@ -360,7 +360,7 @@ impl InteractiveTxSigningSession { self.holder_tx_signatures = Some(TxSignatures { channel_id, tx_hash: self.unsigned_tx.compute_txid(), - witnesses: witnesses.into_iter().collect(), + witnesses, shared_input_signature: None, }); if self.received_commitment_signed