Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dash-spv/src/sync/block_headers/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ impl HeadersPipeline {
}
}

/// Get a reference to the checkpoint manager.
pub fn checkpoint_manager(&self) -> &Arc<CheckpointManager> {
&self.checkpoint_manager
}

/// Initialize the pipeline for downloading from current_height to target_height.
pub fn init(&mut self, current_height: u32, current_hash: BlockHash, target_height: u32) {
self.segments.clear();
Expand Down
7 changes: 7 additions & 0 deletions dash-spv/src/sync/block_headers/sync_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::error::SyncResult;
use crate::network::{Message, MessageType, NetworkEvent, RequestSender};
use crate::storage::{BlockHeaderStorage, MetadataStorage};
use crate::sync::block_headers::pipeline::HeadersPipeline;
use crate::sync::sync_manager::ensure_not_started;
use crate::sync::{
BlockHeadersManager, ManagerIdentifier, ProgressPercentage, SyncEvent, SyncManager,
Expand Down Expand Up @@ -36,6 +37,12 @@ impl<H: BlockHeaderStorage, M: MetadataStorage> SyncManager for BlockHeadersMana
&[MessageType::Headers, MessageType::Inv]
}

fn clear_in_flight_state(&mut self) {
let checkpoint_manager = self.pipeline.checkpoint_manager().clone();
self.pipeline = HeadersPipeline::new(checkpoint_manager);
self.pending_announcements.clear();
}

async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>> {
ensure_not_started(self.state(), self.identifier())?;
self.progress.set_state(SyncState::Syncing);
Expand Down
5 changes: 3 additions & 2 deletions dash-spv/src/sync/blocks/sync_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::error::SyncResult;
use crate::network::{Message, MessageType, RequestSender};
use crate::storage::{BlockHeaderStorage, BlockStorage};
use crate::sync::blocks::pipeline::BlocksPipeline;
use crate::sync::sync_manager::ensure_not_started;
use crate::sync::{
BlocksManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState,
Expand Down Expand Up @@ -45,8 +46,8 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface + 'static> SyncM
Ok(vec![])
}

fn stop_sync(&mut self) {
self.progress.set_state(SyncState::WaitingForConnections);
fn clear_in_flight_state(&mut self) {
self.pipeline = BlocksPipeline::new();
self.filters_sync_complete = false;
}

Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/sync/chainlock/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct ChainLockManager<H: BlockHeaderStorage, M: MetadataStorage> {
/// ChainLock hashes that have been requested (to avoid duplicate requests).
pub(super) requested_chainlocks: HashSet<ChainLockHash>,
/// Whether masternode sync is complete and we can validate signatures.
masternode_ready: bool,
pub(super) masternode_ready: bool,
}

impl<H: BlockHeaderStorage, M: MetadataStorage> ChainLockManager<H, M> {
Expand Down
5 changes: 5 additions & 0 deletions dash-spv/src/sync/chainlock/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ impl<H: BlockHeaderStorage, M: MetadataStorage> SyncManager for ChainLockManager
&[MessageType::CLSig, MessageType::Inv]
}

fn clear_in_flight_state(&mut self) {
self.requested_chainlocks.clear();
self.masternode_ready = false;
}

async fn handle_message(
&mut self,
msg: Message,
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/sync/filter_headers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct FilterHeadersManager<H: BlockHeaderStorage, FH: FilterHeaderStorage>
/// Pipeline for downloading filter headers.
pub(super) pipeline: FilterHeadersPipeline,
/// Checkpoint start height - set when syncing from checkpoint to store prev header once.
checkpoint_start_height: Option<u32>,
pub(super) checkpoint_start_height: Option<u32>,
}

impl<H: BlockHeaderStorage, FH: FilterHeaderStorage> FilterHeadersManager<H, FH> {
Expand Down
6 changes: 6 additions & 0 deletions dash-spv/src/sync/filter_headers/sync_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::error::SyncResult;
use crate::network::{Message, MessageType, RequestSender};
use crate::storage::{BlockHeaderStorage, FilterHeaderStorage};
use crate::sync::filter_headers::pipeline::FilterHeadersPipeline;
use crate::sync::progress::ProgressPercentage;
use crate::sync::{
FilterHeadersManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState,
Expand Down Expand Up @@ -30,6 +31,11 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage> SyncManager for FilterHeade
&[MessageType::CFHeaders]
}

fn clear_in_flight_state(&mut self) {
self.pipeline = FilterHeadersPipeline::default();
self.checkpoint_start_height = None;
}

async fn handle_message(
&mut self,
msg: Message,
Expand Down
14 changes: 2 additions & 12 deletions dash-spv/src/sync/filters/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct FiltersManager<
/// Pipeline for downloading filters.
pub(super) filter_pipeline: FiltersPipeline,
/// Completed batches waiting for verification and storage.
pending_batches: BTreeSet<FiltersBatch>,
pub(super) pending_batches: BTreeSet<FiltersBatch>,
/// Next batch start height to store (for filter verification/storage).
next_batch_to_store: u32,

Expand All @@ -70,7 +70,7 @@ pub struct FiltersManager<
/// Maps block_hash -> (height, batch_start) for batch association.
pub(super) blocks_remaining: BTreeMap<BlockHash, (u32, u32)>,
/// Block hashes that have been matched and queued for download.
filters_matched: HashSet<BlockHash>,
pub(super) filters_matched: HashSet<BlockHash>,
}

impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: WalletInterface>
Expand Down Expand Up @@ -119,16 +119,6 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
&& self.filter_pipeline.is_idle()
}

/// Clear all in-flight processing state. Called on peer disconnect when
/// in-flight filter batches and block tracking become invalid.
pub(super) fn clear_in_flight_state(&mut self) {
self.active_batches.clear();
self.blocks_remaining.clear();
self.filters_matched.clear();
self.pending_batches.clear();
self.filter_pipeline = FiltersPipeline::new();
}

async fn load_filters(
&self,
start_height: u32,
Expand Down
10 changes: 7 additions & 3 deletions dash-spv/src/sync/filters/sync_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::error::{SyncError, SyncResult};
use crate::network::{Message, MessageType, RequestSender};
use crate::storage::{BlockHeaderStorage, FilterHeaderStorage, FilterStorage};
use crate::sync::filters::pipeline::FiltersPipeline;
use crate::sync::progress::ProgressPercentage;
use crate::sync::sync_manager::ensure_not_started;
use crate::sync::{
Expand Down Expand Up @@ -38,9 +39,12 @@ impl<
&[MessageType::CFilter]
}

fn stop_sync(&mut self) {
self.set_state(SyncState::WaitingForConnections);
self.clear_in_flight_state();
fn clear_in_flight_state(&mut self) {
self.active_batches.clear();
self.blocks_remaining.clear();
self.filters_matched.clear();
self.pending_batches.clear();
self.filter_pipeline = FiltersPipeline::new();
}

async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>> {
Expand Down
4 changes: 2 additions & 2 deletions dash-spv/src/sync/instantsend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct InstantLockEntry {

/// Pending InstantLock awaiting validation with retry tracking.
#[derive(Debug, Clone)]
struct PendingInstantLock {
pub(super) struct PendingInstantLock {
/// The InstantLock data.
instant_lock: InstantLock,
/// Number of validation retry attempts.
Expand All @@ -63,7 +63,7 @@ pub struct InstantSendManager {
/// InstantLocks indexed by txid.
instantlocks: HashMap<Txid, InstantLockEntry>,
/// Pending InstantLocks awaiting validation with retry tracking.
pending_instantlocks: Vec<PendingInstantLock>,
pub(super) pending_instantlocks: Vec<PendingInstantLock>,
}

impl InstantSendManager {
Expand Down
4 changes: 4 additions & 0 deletions dash-spv/src/sync/instantsend/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ impl SyncManager for InstantSendManager {
&[MessageType::ISLock, MessageType::Inv]
}

fn clear_in_flight_state(&mut self) {
self.pending_instantlocks.clear();
}

async fn handle_message(
&mut self,
msg: Message,
Expand Down
6 changes: 6 additions & 0 deletions dash-spv/src/sync/masternodes/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ impl<H: BlockHeaderStorage> SyncManager for MasternodesManager<H> {
&[MessageType::MnListDiff, MessageType::QRInfo]
}

fn clear_in_flight_state(&mut self) {
self.sync_state.clear_pending();
self.sync_state.qrinfo_retry_count = 0;
self.sync_state.chainlock_retry_after = None;
}

async fn handle_message(
&mut self,
msg: Message,
Expand Down
7 changes: 7 additions & 0 deletions dash-spv/src/sync/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,13 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug {
/// Called when the network manager loses its peers.
fn stop_sync(&mut self) {
self.set_state(SyncState::WaitingForConnections);
self.clear_in_flight_state();
}

/// Clear all in-flight requests, pipelines, and retry state.
/// Called on disconnect when pending network requests become invalid.
fn clear_in_flight_state(&mut self);

/// Handle an incoming network message.
///
/// Returns events to emit to other managers.
Expand Down Expand Up @@ -362,6 +367,8 @@ mod tests {
&[]
}

fn clear_in_flight_state(&mut self) {}

async fn handle_message(
&mut self,
_msg: Message,
Expand Down
Loading