Skip to content

Commit f409278

Browse files
refactor(sync): Unify header sync finalization for regular and compressed headers
- Extract `finalize_headers_sync` helper to eliminate duplicated phase update logic - Return decompressed header count from `handle_headers2_message` so both paths can track `headers_downloaded` and `headers_per_second` stats uniformly - Remove special-case handling that skipped stats for compressed headers 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent ae449db commit f409278

File tree

2 files changed

+41
-55
lines changed

2 files changed

+41
-55
lines changed

dash-spv/src/sync/headers/manager.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
432432
peer_id: crate::types::PeerId,
433433
storage: &mut S,
434434
network: &mut N,
435-
) -> SyncResult<bool> {
435+
) -> SyncResult<(bool, usize)> {
436436
tracing::info!(
437437
"📦 Received {} compressed headers from peer {}",
438438
headers2.headers.len(),
@@ -503,8 +503,12 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
503503
stats.compression_ratio * 100.0
504504
);
505505

506+
let headers_count = headers.len();
507+
506508
// Process decompressed headers through the normal flow
507-
self.handle_headers_message(headers, storage, network).await
509+
let continue_sync = self.handle_headers_message(headers, storage, network).await?;
510+
511+
Ok((continue_sync, headers_count))
508512
}
509513

510514
/// Prepare sync state without sending network requests.

dash-spv/src/sync/message_handlers.rs

Lines changed: 35 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,12 @@ impl<
291291
network: &mut N,
292292
storage: &mut S,
293293
) -> SyncResult<()> {
294-
let continue_sync = match self
294+
let (continue_sync, headers_count) = match self
295295
.header_sync
296296
.handle_headers2_message(headers2, peer_id, storage, network)
297297
.await
298298
{
299-
Ok(continue_sync) => continue_sync,
299+
Ok(result) => result,
300300
Err(SyncError::Headers2DecompressionFailed(e)) => {
301301
// Headers2 decompression failed - we should fall back to regular headers
302302
tracing::warn!("Headers2 decompression failed: {} - peer may not properly support headers2 or connection issue", e);
@@ -306,46 +306,14 @@ impl<
306306
Err(e) => return Err(e),
307307
};
308308

309-
// Calculate blockchain height before borrowing self.current_phase
310-
let blockchain_height = self.get_blockchain_height_from_storage(storage).await.unwrap_or(0);
311-
312-
// Update phase state and check if we need to transition
313-
let should_transition = if let SyncPhase::DownloadingHeaders {
314-
current_height,
315-
received_empty_response,
316-
last_progress,
317-
..
318-
} = &mut self.current_phase
319-
{
320-
// Update current height - use blockchain height for checkpoint awareness
321-
*current_height = blockchain_height;
322-
323-
// Note: We can't easily track headers_downloaded for compressed headers
324-
// without decompressing first, so we rely on the header sync manager's internal stats
325-
326-
// Mark sync complete if continue_sync is false (no more headers)
327-
if !continue_sync {
328-
*received_empty_response = true;
329-
}
330-
331-
// Update progress time
332-
*last_progress = Instant::now();
333-
334-
// Check if phase is complete
335-
!continue_sync || *received_empty_response
336-
} else {
337-
false
338-
};
339-
340-
if should_transition {
341-
self.transition_to_next_phase(storage, network, "Headers sync complete via Headers2")
342-
.await?;
343-
344-
// Execute the next phase
345-
self.execute_current_phase(network, storage).await?;
346-
}
347-
348-
Ok(())
309+
self.finalize_headers_sync(
310+
continue_sync,
311+
headers_count as u32,
312+
network,
313+
storage,
314+
"Headers sync complete via Headers2",
315+
)
316+
.await
349317
}
350318

351319
pub(super) async fn handle_headers_message(
@@ -357,10 +325,28 @@ impl<
357325
let continue_sync =
358326
self.header_sync.handle_headers_message(headers.clone(), storage, network).await?;
359327

360-
// Calculate blockchain height before borrowing self.current_phase
328+
self.finalize_headers_sync(
329+
continue_sync,
330+
headers.len() as u32,
331+
network,
332+
storage,
333+
"Headers sync complete",
334+
)
335+
.await
336+
}
337+
338+
/// Common logic for finalizing header sync after processing headers (regular or compressed).
339+
/// Updates phase state, marks completion, and triggers phase transition if needed.
340+
async fn finalize_headers_sync(
341+
&mut self,
342+
continue_sync: bool,
343+
headers_count: u32,
344+
network: &mut N,
345+
storage: &mut S,
346+
transition_reason: &str,
347+
) -> SyncResult<()> {
361348
let blockchain_height = self.get_blockchain_height_from_storage(storage).await.unwrap_or(0);
362349

363-
// Update phase state and check if we need to transition
364350
let should_transition = if let SyncPhase::DownloadingHeaders {
365351
current_height,
366352
headers_downloaded,
@@ -371,32 +357,28 @@ impl<
371357
..
372358
} = &mut self.current_phase
373359
{
374-
// Update current height - use blockchain height for checkpoint awareness
375360
*current_height = blockchain_height;
376361

377-
// Update progress
378-
*headers_downloaded += headers.len() as u32;
362+
*headers_downloaded += headers_count;
379363
let elapsed = start_time.elapsed().as_secs_f64();
380364
if elapsed > 0.0 {
381365
*headers_per_second = *headers_downloaded as f64 / elapsed;
382366
}
383367

384-
// Check if we received empty response (sync complete)
385-
if headers.is_empty() {
368+
// Mark sync complete - this flag is checked by are_headers_complete() during transition
369+
if !continue_sync {
386370
*received_empty_response = true;
387371
}
388372

389-
// Update progress time
390373
*last_progress = Instant::now();
391374

392-
// Check if phase is complete
393-
!continue_sync || *received_empty_response
375+
!continue_sync
394376
} else {
395377
false
396378
};
397379

398380
if should_transition {
399-
self.transition_to_next_phase(storage, network, "Headers sync complete").await?;
381+
self.transition_to_next_phase(storage, network, transition_reason).await?;
400382
self.execute_current_phase(network, storage).await?;
401383
}
402384

0 commit comments

Comments
 (0)