Skip to content

Commit fd42619

Browse files
fix(headers2): Fix compressed headers protocol compatibility with Dash Core
This commit fixes critical incompatibilities between the Rust headers2 implementation and the C++ Dash Core reference implementation (DIP-0025). ## Issues Fixed ### Version Offset Encoding Mismatch - C++ uses offset=0 for "version not in cache" (full version present) - C++ uses offset=1-7 for "version at position offset-1 in cache" - Rust incorrectly used offset=7 for uncompressed, offset=0-6 for cached - Now matches C++ semantics exactly ### Version Cache Data Structure - C++ uses std::list with MRU (Most Recently Used) reordering - Rust used a circular buffer without MRU reordering - Changed to Vec<i32> with proper MRU behavior matching C++ ### Serialization/Deserialization - Fixed Decodable impl to read version when offset=0 (not offset=7) - Added MissingVersion error variant for proper error handling ## Changes ### dashcore (dash/src/network/message_headers2.rs) - Rewrote CompressionState to use Vec with MRU reordering - Fixed compress() to use offset=0 for uncompressed versions - Fixed decompress() to handle C++ offset semantics - Updated Decodable to read version when offset=0 - Added comprehensive tests for C++ compatibility ### dash-spv - Enabled headers2 in handshake negotiation - Enabled headers2 in sync manager - Fixed phase transition when receiving empty headers2 response - Re-enabled has_headers2_peer() check ## Testing - Added headers2_compatibility_test.rs with 12 tests verifying: - Version offset C++ semantics - MRU cache reordering behavior - Flag bit semantics - Serialization format compatibility - Cross-implementation compatibility - All existing tests pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 8009033 commit fd42619

File tree

6 files changed

+739
-156
lines changed

6 files changed

+739
-156
lines changed

dash-spv/src/network/handshake.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,8 @@ impl HandshakeManager {
252252
.unwrap_or(Duration::from_secs(0))
253253
.as_secs() as i64;
254254

255-
// SPV client doesn't advertise any special services since headers2 is disabled
256-
let services = ServiceFlags::NONE;
255+
// Advertise headers2 support (NODE_HEADERS_COMPRESSED)
256+
let services = ServiceFlags::NONE | NODE_HEADERS_COMPRESSED;
257257

258258
// Parse the local address safely
259259
let local_addr = "127.0.0.1:0"
@@ -310,10 +310,13 @@ impl HandshakeManager {
310310

311311
/// Negotiate headers2 support with the peer after handshake completion.
312312
async fn negotiate_headers2(&self, connection: &mut Peer) -> NetworkResult<()> {
313-
// Headers2 is currently disabled due to protocol compatibility issues
314-
// Always send SendHeaders regardless of peer support
315-
tracing::info!("Headers2 is disabled - sending SendHeaders only");
316-
connection.send_message(NetworkMessage::SendHeaders).await?;
313+
if self.peer_supports_headers2() {
314+
tracing::info!("Peer supports headers2 - sending SendHeaders2");
315+
connection.send_message(NetworkMessage::SendHeaders2).await?;
316+
} else {
317+
tracing::info!("Peer does not support headers2 - sending SendHeaders");
318+
connection.send_message(NetworkMessage::SendHeaders).await?;
319+
}
317320
Ok(())
318321
}
319322
}

dash-spv/src/network/manager.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -423,19 +423,20 @@ impl PeerNetworkManager {
423423
headers.len()
424424
);
425425
// Check if peer supports headers2
426-
// TODO: Re-enable this warning once headers2 is fixed
427-
// Currently suppressed since headers2 is disabled
428-
/*
429426
let peer_guard = peer.read().await;
430-
if peer_guard.peer_info().services.map(|s| {
431-
dashcore::network::constants::ServiceFlags::from(s).has(
432-
dashcore::network::constants::ServiceFlags::from(2048u64)
433-
)
434-
}).unwrap_or(false) {
427+
if peer_guard
428+
.peer_info()
429+
.services
430+
.map(|s| {
431+
dashcore::network::constants::ServiceFlags::from(s).has(
432+
dashcore::network::constants::NODE_HEADERS_COMPRESSED,
433+
)
434+
})
435+
.unwrap_or(false)
436+
{
435437
log::warn!("⚠️ Peer {} supports headers2 but sent regular headers - possible protocol issue", addr);
436438
}
437439
drop(peer_guard);
438-
*/
439440
// Forward to client
440441
}
441442
NetworkMessage::Headers2(headers2) => {
@@ -1296,9 +1297,7 @@ impl NetworkManager for PeerNetworkManager {
12961297
}
12971298

12981299
async fn has_headers2_peer(&self) -> bool {
1299-
// Headers2 is currently disabled due to protocol compatibility issues
1300-
// TODO: Fix headers2 decompression before re-enabling
1301-
false
1300+
self.has_peer_with_service(dashcore::network::constants::NODE_HEADERS_COMPRESSED).await
13021301
}
13031302

13041303
async fn get_last_message_peer_id(&self) -> crate::types::PeerId {

dash-spv/src/sync/headers/manager.rs

Lines changed: 56 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -333,16 +333,8 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
333333
hash
334334
);
335335
vec![hash]
336-
} else if network.has_headers2_peer().await && !self.headers2_failed {
337-
// Check if this is genesis and we're using headers2
338-
let genesis_hash = self.config.network.known_genesis_block_hash();
339-
if genesis_hash == Some(hash) {
340-
tracing::info!("📍 Using empty locator for headers2 genesis sync");
341-
vec![]
342-
} else {
343-
vec![hash]
344-
}
345336
} else {
337+
// Always include the hash in the locator - peer needs to know our position
346338
vec![hash]
347339
}
348340
}
@@ -383,9 +375,8 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
383375
getheaders_msg.stop_hash
384376
);
385377

386-
// Headers2 is currently disabled due to protocol compatibility issues
387-
// TODO: Fix headers2 decompression before re-enabling
388-
let use_headers2 = false; // Disabled until headers2 implementation is fixed
378+
// Use headers2 if peer supports it and we haven't had failures
379+
let use_headers2 = network.has_headers2_peer().await && !self.headers2_failed;
389380

390381
// Log details about the request
391382
tracing::info!(
@@ -456,64 +447,50 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
456447
&mut self,
457448
headers2: dashcore::network::message_headers2::Headers2Message,
458449
peer_id: crate::types::PeerId,
459-
_storage: &mut S,
460-
_network: &mut N,
450+
storage: &mut S,
451+
network: &mut N,
461452
) -> SyncResult<bool> {
462-
tracing::warn!(
463-
"⚠️ Headers2 support is currently NON-FUNCTIONAL. Received {} compressed headers from peer {} but cannot process them.",
453+
tracing::info!(
454+
"📦 Received {} compressed headers from peer {}",
464455
headers2.headers.len(),
465456
peer_id
466457
);
467458

468-
// Mark headers2 as failed for this session to avoid retrying
469-
self.headers2_failed = true;
470-
471-
// Return an error to trigger fallback to regular headers
472-
return Err(SyncError::Headers2DecompressionFailed(
473-
"Headers2 is currently disabled due to protocol compatibility issues".to_string(),
474-
));
475-
476-
#[allow(unreachable_code)]
477-
{
478-
// If this is the first headers2 message, and we need to initialize compression state
479-
if !headers2.headers.is_empty() {
480-
// Check if we need to initialize the compression state
481-
let state = self.headers2_state.get_state(peer_id);
482-
if state.prev_header.is_none() {
483-
// If we're syncing from genesis (height 0), initialize with genesis header
484-
if self.chain_state.read().await.tip_height() == 0 {
485-
// We have genesis header at index 0
486-
if let Some(genesis_header) =
487-
self.chain_state.read().await.header_at_height(0)
488-
{
489-
tracing::info!(
459+
// If this is the first headers2 message, and we need to initialize compression state
460+
if !headers2.headers.is_empty() {
461+
// Check if we need to initialize the compression state
462+
let state = self.headers2_state.get_state(peer_id);
463+
if state.prev_header.is_none() {
464+
// If we're syncing from genesis (height 0), initialize with genesis header
465+
if self.chain_state.read().await.tip_height() == 0 {
466+
// We have genesis header at index 0
467+
if let Some(genesis_header) = self.chain_state.read().await.header_at_height(0)
468+
{
469+
tracing::info!(
490470
"Initializing headers2 compression state for peer {} with genesis header",
491471
peer_id
492472
);
493-
self.headers2_state.init_peer_state(peer_id, *genesis_header);
494-
}
495-
} else if self.chain_state.read().await.tip_height() > 0 {
496-
// Get our current tip to use as the base for compression
497-
if let Some(tip_header) = self.chain_state.read().await.get_tip_header() {
498-
tracing::info!(
473+
self.headers2_state.init_peer_state(peer_id, *genesis_header);
474+
}
475+
} else if self.chain_state.read().await.tip_height() > 0 {
476+
// Get our current tip to use as the base for compression
477+
if let Some(tip_header) = self.chain_state.read().await.get_tip_header() {
478+
tracing::info!(
499479
"Initializing headers2 compression state for peer {} with tip header at height {}",
500480
peer_id,
501481
self.chain_state.read().await.tip_height()
502482
);
503-
self.headers2_state.init_peer_state(peer_id, tip_header);
504-
}
483+
self.headers2_state.init_peer_state(peer_id, tip_header);
505484
}
506485
}
507486
}
487+
}
508488

509-
// Decompress headers using the peer's compression state
510-
let headers = match self
511-
.headers2_state
512-
.process_headers(peer_id, headers2.headers.clone())
513-
{
514-
Ok(headers) => headers,
515-
Err(e) => {
516-
tracing::error!(
489+
// Decompress headers using the peer's compression state
490+
let headers = match self.headers2_state.process_headers(peer_id, headers2.headers.clone()) {
491+
Ok(headers) => headers,
492+
Err(e) => {
493+
tracing::error!(
517494
"Failed to decompress headers2 from peer {}: {}. Headers count: {}, first header compressed: {}, chain height: {}",
518495
peer_id,
519496
e,
@@ -526,37 +503,36 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
526503
self.chain_state.read().await.tip_height()
527504
);
528505

529-
// If we failed due to missing previous header, and we're at genesis,
530-
// this might be a protocol issue where peer expects us to have genesis in compression state
531-
if matches!(e, crate::sync::headers2::ProcessError::DecompressionError(0, _))
532-
&& self.chain_state.read().await.tip_height() == 0
533-
{
534-
tracing::warn!(
506+
// If we failed due to missing previous header, and we're at genesis,
507+
// this might be a protocol issue where peer expects us to have genesis in compression state
508+
if matches!(e, crate::sync::headers2::ProcessError::DecompressionError(0, _))
509+
&& self.chain_state.read().await.tip_height() == 0
510+
{
511+
tracing::warn!(
535512
"Headers2 decompression failed at genesis. Peer may be sending compressed headers that reference genesis. Consider falling back to regular headers."
536513
);
537-
}
538-
539-
// Return a specific error that can trigger fallback
540-
// Mark that headers2 failed for this sync session
541-
self.headers2_failed = true;
542-
return Err(SyncError::Headers2DecompressionFailed(format!(
543-
"Failed to decompress headers: {}",
544-
e
545-
)));
546514
}
547-
};
548515

549-
// Log compression statistics
550-
let stats = self.headers2_state.get_stats();
551-
tracing::info!(
552-
"📊 Headers2 compression stats: {:.1}% bandwidth saved, {:.1}% compression ratio",
553-
stats.bandwidth_savings,
554-
stats.compression_ratio * 100.0
555-
);
516+
// Return a specific error that can trigger fallback
517+
// Mark that headers2 failed for this sync session
518+
self.headers2_failed = true;
519+
return Err(SyncError::Headers2DecompressionFailed(format!(
520+
"Failed to decompress headers: {}",
521+
e
522+
)));
523+
}
524+
};
556525

557-
// Process decompressed headers through the normal flow
558-
self.handle_headers_message(headers, _storage, _network).await
559-
}
526+
// Log compression statistics
527+
let stats = self.headers2_state.get_stats();
528+
tracing::info!(
529+
"📊 Headers2 compression stats: {:.1}% bandwidth saved, {:.1}% compression ratio",
530+
stats.bandwidth_savings,
531+
stats.compression_ratio * 100.0
532+
);
533+
534+
// Process decompressed headers through the normal flow
535+
self.handle_headers_message(headers, storage, network).await
560536
}
561537

562538
/// Prepare sync state without sending network requests.

dash-spv/src/sync/message_handlers.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ impl<
312312
// Update phase state and check if we need to transition
313313
let should_transition = if let SyncPhase::DownloadingHeaders {
314314
current_height,
315-
315+
received_empty_response,
316316
last_progress,
317317
..
318318
} = &mut self.current_phase
@@ -323,11 +323,16 @@ impl<
323323
// Note: We can't easily track headers_downloaded for compressed headers
324324
// without decompressing first, so we rely on the header sync manager's internal stats
325325

326+
// Mark sync complete if continue_sync is false (no more headers)
327+
if !continue_sync {
328+
*received_empty_response = true;
329+
}
330+
326331
// Update progress time
327332
*last_progress = Instant::now();
328333

329334
// Check if phase is complete
330-
!continue_sync
335+
!continue_sync || *received_empty_response
331336
} else {
332337
false
333338
};

0 commit comments

Comments
 (0)