From 210db5db309f4374f34dbdacd6e99967f9f0b7c8 Mon Sep 17 00:00:00 2001 From: xdustinface Date: Thu, 26 Feb 2026 08:47:02 +0700 Subject: [PATCH] refactor: add `clear_in_flight_state()` to `SyncManager` trait Add a required `clear_in_flight_state()` method to the `SyncManager` trait and implement it for all managers. This gets called by the default `stop_sync()` implementation to force every manager to handle disconnect cleanup. --- dash-spv/src/sync/block_headers/pipeline.rs | 5 +++++ dash-spv/src/sync/block_headers/sync_manager.rs | 7 +++++++ dash-spv/src/sync/blocks/sync_manager.rs | 5 +++-- dash-spv/src/sync/chainlock/manager.rs | 2 +- dash-spv/src/sync/chainlock/sync_manager.rs | 5 +++++ dash-spv/src/sync/filter_headers/manager.rs | 2 +- dash-spv/src/sync/filter_headers/sync_manager.rs | 6 ++++++ dash-spv/src/sync/filters/manager.rs | 14 ++------------ dash-spv/src/sync/filters/sync_manager.rs | 10 +++++++--- dash-spv/src/sync/instantsend/manager.rs | 4 ++-- dash-spv/src/sync/instantsend/sync_manager.rs | 4 ++++ dash-spv/src/sync/masternodes/sync_manager.rs | 6 ++++++ dash-spv/src/sync/sync_manager.rs | 7 +++++++ 13 files changed, 56 insertions(+), 21 deletions(-) diff --git a/dash-spv/src/sync/block_headers/pipeline.rs b/dash-spv/src/sync/block_headers/pipeline.rs index af08a0a68..cf516f4bf 100644 --- a/dash-spv/src/sync/block_headers/pipeline.rs +++ b/dash-spv/src/sync/block_headers/pipeline.rs @@ -52,6 +52,11 @@ impl HeadersPipeline { } } + /// Get a reference to the checkpoint manager. + pub fn checkpoint_manager(&self) -> &Arc { + &self.checkpoint_manager + } + /// Initialize the pipeline for downloading from current_height to target_height. pub fn init(&mut self, current_height: u32, current_hash: BlockHash, target_height: u32) { self.segments.clear(); diff --git a/dash-spv/src/sync/block_headers/sync_manager.rs b/dash-spv/src/sync/block_headers/sync_manager.rs index a040c9b1f..7a35c933a 100644 --- a/dash-spv/src/sync/block_headers/sync_manager.rs +++ b/dash-spv/src/sync/block_headers/sync_manager.rs @@ -1,6 +1,7 @@ use crate::error::SyncResult; use crate::network::{Message, MessageType, NetworkEvent, RequestSender}; use crate::storage::{BlockHeaderStorage, MetadataStorage}; +use crate::sync::block_headers::pipeline::HeadersPipeline; use crate::sync::sync_manager::ensure_not_started; use crate::sync::{ BlockHeadersManager, ManagerIdentifier, ProgressPercentage, SyncEvent, SyncManager, @@ -36,6 +37,12 @@ impl SyncManager for BlockHeadersMana &[MessageType::Headers, MessageType::Inv] } + fn clear_in_flight_state(&mut self) { + let checkpoint_manager = self.pipeline.checkpoint_manager().clone(); + self.pipeline = HeadersPipeline::new(checkpoint_manager); + self.pending_announcements.clear(); + } + async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult> { ensure_not_started(self.state(), self.identifier())?; self.progress.set_state(SyncState::Syncing); diff --git a/dash-spv/src/sync/blocks/sync_manager.rs b/dash-spv/src/sync/blocks/sync_manager.rs index d8bc875db..d869b5d2b 100644 --- a/dash-spv/src/sync/blocks/sync_manager.rs +++ b/dash-spv/src/sync/blocks/sync_manager.rs @@ -1,6 +1,7 @@ use crate::error::SyncResult; use crate::network::{Message, MessageType, RequestSender}; use crate::storage::{BlockHeaderStorage, BlockStorage}; +use crate::sync::blocks::pipeline::BlocksPipeline; use crate::sync::sync_manager::ensure_not_started; use crate::sync::{ BlocksManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState, @@ -45,8 +46,8 @@ impl SyncM Ok(vec![]) } - fn stop_sync(&mut self) { - self.progress.set_state(SyncState::WaitingForConnections); + fn clear_in_flight_state(&mut self) { + self.pipeline = BlocksPipeline::new(); self.filters_sync_complete = false; } diff --git a/dash-spv/src/sync/chainlock/manager.rs b/dash-spv/src/sync/chainlock/manager.rs index fb29d436e..abbb4b766 100644 --- a/dash-spv/src/sync/chainlock/manager.rs +++ b/dash-spv/src/sync/chainlock/manager.rs @@ -41,7 +41,7 @@ pub struct ChainLockManager { /// ChainLock hashes that have been requested (to avoid duplicate requests). pub(super) requested_chainlocks: HashSet, /// Whether masternode sync is complete and we can validate signatures. - masternode_ready: bool, + pub(super) masternode_ready: bool, } impl ChainLockManager { diff --git a/dash-spv/src/sync/chainlock/sync_manager.rs b/dash-spv/src/sync/chainlock/sync_manager.rs index 78cd1ff26..7fc277cf2 100644 --- a/dash-spv/src/sync/chainlock/sync_manager.rs +++ b/dash-spv/src/sync/chainlock/sync_manager.rs @@ -26,6 +26,11 @@ impl SyncManager for ChainLockManager &[MessageType::CLSig, MessageType::Inv] } + fn clear_in_flight_state(&mut self) { + self.requested_chainlocks.clear(); + self.masternode_ready = false; + } + async fn handle_message( &mut self, msg: Message, diff --git a/dash-spv/src/sync/filter_headers/manager.rs b/dash-spv/src/sync/filter_headers/manager.rs index 43a0ecc86..10b197736 100644 --- a/dash-spv/src/sync/filter_headers/manager.rs +++ b/dash-spv/src/sync/filter_headers/manager.rs @@ -36,7 +36,7 @@ pub struct FilterHeadersManager /// Pipeline for downloading filter headers. pub(super) pipeline: FilterHeadersPipeline, /// Checkpoint start height - set when syncing from checkpoint to store prev header once. - checkpoint_start_height: Option, + pub(super) checkpoint_start_height: Option, } impl FilterHeadersManager { diff --git a/dash-spv/src/sync/filter_headers/sync_manager.rs b/dash-spv/src/sync/filter_headers/sync_manager.rs index 22bbbb6d8..c67426d01 100644 --- a/dash-spv/src/sync/filter_headers/sync_manager.rs +++ b/dash-spv/src/sync/filter_headers/sync_manager.rs @@ -1,6 +1,7 @@ use crate::error::SyncResult; use crate::network::{Message, MessageType, RequestSender}; use crate::storage::{BlockHeaderStorage, FilterHeaderStorage}; +use crate::sync::filter_headers::pipeline::FilterHeadersPipeline; use crate::sync::progress::ProgressPercentage; use crate::sync::{ FilterHeadersManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState, @@ -30,6 +31,11 @@ impl SyncManager for FilterHeade &[MessageType::CFHeaders] } + fn clear_in_flight_state(&mut self) { + self.pipeline = FilterHeadersPipeline::default(); + self.checkpoint_start_height = None; + } + async fn handle_message( &mut self, msg: Message, diff --git a/dash-spv/src/sync/filters/manager.rs b/dash-spv/src/sync/filters/manager.rs index 9a7592418..b3d5e04f6 100644 --- a/dash-spv/src/sync/filters/manager.rs +++ b/dash-spv/src/sync/filters/manager.rs @@ -57,7 +57,7 @@ pub struct FiltersManager< /// Pipeline for downloading filters. pub(super) filter_pipeline: FiltersPipeline, /// Completed batches waiting for verification and storage. - pending_batches: BTreeSet, + pub(super) pending_batches: BTreeSet, /// Next batch start height to store (for filter verification/storage). next_batch_to_store: u32, @@ -70,7 +70,7 @@ pub struct FiltersManager< /// Maps block_hash -> (height, batch_start) for batch association. pub(super) blocks_remaining: BTreeMap, /// Block hashes that have been matched and queued for download. - filters_matched: HashSet, + pub(super) filters_matched: HashSet, } impl @@ -119,16 +119,6 @@ impl SyncResult> { diff --git a/dash-spv/src/sync/instantsend/manager.rs b/dash-spv/src/sync/instantsend/manager.rs index dd263a977..5cc3e9c3b 100644 --- a/dash-spv/src/sync/instantsend/manager.rs +++ b/dash-spv/src/sync/instantsend/manager.rs @@ -41,7 +41,7 @@ pub struct InstantLockEntry { /// Pending InstantLock awaiting validation with retry tracking. #[derive(Debug, Clone)] -struct PendingInstantLock { +pub(super) struct PendingInstantLock { /// The InstantLock data. instant_lock: InstantLock, /// Number of validation retry attempts. @@ -63,7 +63,7 @@ pub struct InstantSendManager { /// InstantLocks indexed by txid. instantlocks: HashMap, /// Pending InstantLocks awaiting validation with retry tracking. - pending_instantlocks: Vec, + pub(super) pending_instantlocks: Vec, } impl InstantSendManager { diff --git a/dash-spv/src/sync/instantsend/sync_manager.rs b/dash-spv/src/sync/instantsend/sync_manager.rs index 4636f630b..b9f58b77b 100644 --- a/dash-spv/src/sync/instantsend/sync_manager.rs +++ b/dash-spv/src/sync/instantsend/sync_manager.rs @@ -25,6 +25,10 @@ impl SyncManager for InstantSendManager { &[MessageType::ISLock, MessageType::Inv] } + fn clear_in_flight_state(&mut self) { + self.pending_instantlocks.clear(); + } + async fn handle_message( &mut self, msg: Message, diff --git a/dash-spv/src/sync/masternodes/sync_manager.rs b/dash-spv/src/sync/masternodes/sync_manager.rs index 5bd5fea81..6d2df7b2c 100644 --- a/dash-spv/src/sync/masternodes/sync_manager.rs +++ b/dash-spv/src/sync/masternodes/sync_manager.rs @@ -234,6 +234,12 @@ impl SyncManager for MasternodesManager { &[MessageType::MnListDiff, MessageType::QRInfo] } + fn clear_in_flight_state(&mut self) { + self.sync_state.clear_pending(); + self.sync_state.qrinfo_retry_count = 0; + self.sync_state.chainlock_retry_after = None; + } + async fn handle_message( &mut self, msg: Message, diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 2d8189b03..72f423e1b 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -115,8 +115,13 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { /// Called when the network manager loses its peers. fn stop_sync(&mut self) { self.set_state(SyncState::WaitingForConnections); + self.clear_in_flight_state(); } + /// Clear all in-flight requests, pipelines, and retry state. + /// Called on disconnect when pending network requests become invalid. + fn clear_in_flight_state(&mut self); + /// Handle an incoming network message. /// /// Returns events to emit to other managers. @@ -362,6 +367,8 @@ mod tests { &[] } + fn clear_in_flight_state(&mut self) {} + async fn handle_message( &mut self, _msg: Message,