Skip to content
15 changes: 9 additions & 6 deletions dash-spv/src/network/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ impl HandshakeManager {
.unwrap_or(Duration::from_secs(0))
.as_secs() as i64;

// SPV client doesn't advertise any special services since headers2 is disabled
let services = ServiceFlags::NONE;
// Advertise headers2 support (NODE_HEADERS_COMPRESSED)
let services = ServiceFlags::NONE | NODE_HEADERS_COMPRESSED;

// Parse the local address safely
let local_addr = "127.0.0.1:0"
Expand Down Expand Up @@ -310,10 +310,13 @@ impl HandshakeManager {

/// Negotiate headers2 support with the peer after handshake completion.
async fn negotiate_headers2(&self, connection: &mut Peer) -> NetworkResult<()> {
// Headers2 is currently disabled due to protocol compatibility issues
// Always send SendHeaders regardless of peer support
tracing::info!("Headers2 is disabled - sending SendHeaders only");
connection.send_message(NetworkMessage::SendHeaders).await?;
if self.peer_supports_headers2() {
tracing::info!("Peer supports headers2 - sending SendHeaders2");
connection.send_message(NetworkMessage::SendHeaders2).await?;
} else {
tracing::info!("Peer does not support headers2 - sending SendHeaders");
connection.send_message(NetworkMessage::SendHeaders).await?;
}
Ok(())
}
}
56 changes: 44 additions & 12 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,19 +423,20 @@ impl PeerNetworkManager {
headers.len()
);
// Check if peer supports headers2
// TODO: Re-enable this warning once headers2 is fixed
// Currently suppressed since headers2 is disabled
/*
let peer_guard = peer.read().await;
if peer_guard.peer_info().services.map(|s| {
dashcore::network::constants::ServiceFlags::from(s).has(
dashcore::network::constants::ServiceFlags::from(2048u64)
)
}).unwrap_or(false) {
if peer_guard
.peer_info()
.services
.map(|s| {
dashcore::network::constants::ServiceFlags::from(s).has(
dashcore::network::constants::NODE_HEADERS_COMPRESSED,
)
})
.unwrap_or(false)
{
log::warn!("⚠️ Peer {} supports headers2 but sent regular headers - possible protocol issue", addr);
}
drop(peer_guard);
*/
// Forward to client
}
NetworkMessage::Headers2(headers2) => {
Expand Down Expand Up @@ -779,6 +780,7 @@ impl PeerNetworkManager {
// For filter-related messages, we need a peer that supports compact filters
let requires_compact_filters =
matches!(&message, NetworkMessage::GetCFHeaders(_) | NetworkMessage::GetCFilters(_));
let requires_headers2 = matches!(&message, NetworkMessage::GetHeaders2(_));

let selected_peer = if requires_compact_filters {
// Find a peer that supports compact filters
Expand Down Expand Up @@ -806,6 +808,37 @@ impl PeerNetworkManager {
));
}
}
} else if requires_headers2 {
// Prefer a peer that advertises headers2 support
let mut current_sync_peer = self.current_sync_peer.lock().await;
let mut selected: Option<SocketAddr> = None;

if let Some(current_addr) = *current_sync_peer {
if let Some((_, peer)) = peers.iter().find(|(addr, _)| *addr == current_addr) {
let peer_guard = peer.read().await;
if peer_guard.peer_info().supports_headers2() {
selected = Some(current_addr);
}
}
}

if selected.is_none() {
for (addr, peer) in &peers {
let peer_guard = peer.read().await;
if peer_guard.peer_info().supports_headers2() {
selected = Some(*addr);
break;
}
}
}

let chosen = selected.unwrap_or(peers[0].0);
if Some(chosen) != *current_sync_peer {
log::info!("Sync peer selected for Headers2: {}", chosen);
*current_sync_peer = Some(chosen);
}
drop(current_sync_peer);
chosen
Comment on lines +811 to +841
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n "GetHeaders2" --type rust -C 3 dash-spv/src/network/

Repository: dashpay/rust-dashcore

Length of output: 3715


🏁 Script executed:

# Check the broader context of the peer selection logic
sed -n '800,850p' dash-spv/src/network/manager.rs

Repository: dashpay/rust-dashcore

Length of output: 2308


🏁 Script executed:

# Search for error handling related to unsupported messages or headers2
rg -n "supports_headers2|GetHeaders2|protocol.*error" --type rust dash-spv/src/network/ | head -40

Repository: dashpay/rust-dashcore

Length of output: 1317


🏁 Script executed:

# Check what happens after choose_sync_peer_for_headers is called
rg -n "choose_sync_peer_for_headers" --type rust -B 2 -A 5 dash-spv/src/network/

Repository: dashpay/rust-dashcore

Length of output: 47


🏁 Script executed:

# Check how responses are handled and if there's error handling for unsupported messages
rg -n "Headers2|handle_response|response.*error" --type rust dash-spv/src/network/ | head -50

Repository: dashpay/rust-dashcore

Length of output: 3301


🏁 Script executed:

# Look at the compact filter error handling pattern more closely
sed -n '765,805p' dash-spv/src/network/manager.rs

Repository: dashpay/rust-dashcore

Length of output: 1661


🏁 Script executed:

# Check if peers[0] is guaranteed to exist at line 835
sed -n '760,790p' dash-spv/src/network/manager.rs

Repository: dashpay/rust-dashcore

Length of output: 1423


🏁 Script executed:

# Check what happens in the broader send_message context - is there error handling?
sed -n '750,770p' dash-spv/src/network/manager.rs

Repository: dashpay/rust-dashcore

Length of output: 850


🏁 Script executed:

# Check for timeout/retry logic related to headers2 or message responses
rg -n "timeout|retry|wait.*Headers2|select.*timeout" --type rust dash-spv/src/network/manager.rs | head -30

Repository: dashpay/rust-dashcore

Length of output: 455


🏁 Script executed:

# Check the function that calls send_to_single_peer to see if there's response handling
sed -n '680,750p' dash-spv/src/network/manager.rs

Repository: dashpay/rust-dashcore

Length of output: 4042


Fix fallback behavior for GetHeaders2 peer selection.

The code falls back to peers[0] when no headers2-supporting peer is found (line 835), but this peer may not support headers2, causing protocol errors. This contradicts the pattern used for compact filters (lines 779-802), which correctly returns an error when no compatible peer exists.

Replace the fallback with error handling:

let chosen = selected.ok_or_else(|| {
    log::warn!("No peers support headers2, cannot send GetHeaders2");
    NetworkError::ProtocolError("No peers support headers2".to_string())
})?;

This ensures GetHeaders2 is only sent to peers that advertise headers2 support, consistent with the compact filter request handling.

} else {
// For non-filter messages, use the sticky sync peer
let mut current_sync_peer = self.current_sync_peer.lock().await;
Expand Down Expand Up @@ -1060,6 +1093,7 @@ impl NetworkManager for PeerNetworkManager {
// For sync messages that require consistent responses, send to only one peer
match &message {
NetworkMessage::GetHeaders(_)
| NetworkMessage::GetHeaders2(_)
| NetworkMessage::GetCFHeaders(_)
| NetworkMessage::GetCFilters(_)
| NetworkMessage::GetData(_)
Expand Down Expand Up @@ -1296,9 +1330,7 @@ impl NetworkManager for PeerNetworkManager {
}

async fn has_headers2_peer(&self) -> bool {
// Headers2 is currently disabled due to protocol compatibility issues
// TODO: Fix headers2 decompression before re-enabling
false
self.has_peer_with_service(dashcore::network::constants::NODE_HEADERS_COMPRESSED).await
}

async fn get_last_message_peer_id(&self) -> crate::types::PeerId {
Expand Down
157 changes: 49 additions & 108 deletions dash-spv/src/sync/headers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,32 +318,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
base_hash: Option<BlockHash>,
) -> SyncResult<()> {
let block_locator = match base_hash {
Some(hash) => {
// When syncing from a checkpoint, we need to create a proper locator
// that helps the peer understand we want headers AFTER this point
if self.is_synced_from_checkpoint() {
// For checkpoint sync, only include the checkpoint hash
// Including genesis would allow peers to fall back to sending headers from genesis
// if they don't recognize the checkpoint, which is exactly what we want to avoid
tracing::debug!(
"📍 Using checkpoint-only locator for height {}: [{}]",
self.get_sync_base_height(),
hash
);
vec![hash]
} else if network.has_headers2_peer().await && !self.headers2_failed {
// Check if this is genesis and we're using headers2
let genesis_hash = self.config.network.known_genesis_block_hash();
if genesis_hash == Some(hash) {
tracing::info!("📍 Using empty locator for headers2 genesis sync");
vec![]
} else {
vec![hash]
}
} else {
vec![hash]
}
}
Some(hash) => vec![hash],
None => {
// Check if we're syncing from a checkpoint
if self.is_synced_from_checkpoint()
Expand Down Expand Up @@ -381,9 +356,8 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
getheaders_msg.stop_hash
);

// Headers2 is currently disabled due to protocol compatibility issues
// TODO: Fix headers2 decompression before re-enabling
let use_headers2 = false; // Disabled until headers2 implementation is fixed
// Use headers2 if peer supports it and we haven't had failures
let use_headers2 = network.has_headers2_peer().await && !self.headers2_failed;

// Log details about the request
tracing::info!(
Expand Down Expand Up @@ -454,64 +428,39 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
&mut self,
headers2: &dashcore::network::message_headers2::Headers2Message,
peer_id: crate::types::PeerId,
_storage: &mut S,
_network: &mut N,
) -> SyncResult<bool> {
tracing::warn!(
"⚠️ Headers2 support is currently NON-FUNCTIONAL. Received {} compressed headers from peer {} but cannot process them.",
storage: &mut S,
network: &mut N,
) -> SyncResult<(bool, usize)> {
tracing::info!(
"📦 Received {} compressed headers from peer {}",
headers2.headers.len(),
peer_id
);

// Mark headers2 as failed for this session to avoid retrying
self.headers2_failed = true;

// Return an error to trigger fallback to regular headers
return Err(SyncError::Headers2DecompressionFailed(
"Headers2 is currently disabled due to protocol compatibility issues".to_string(),
));

#[allow(unreachable_code)]
{
// If this is the first headers2 message, and we need to initialize compression state
if !headers2.headers.is_empty() {
// Check if we need to initialize the compression state
let state = self.headers2_state.get_state(peer_id);
if state.prev_header.is_none() {
// If we're syncing from genesis (height 0), initialize with genesis header
if self.chain_state.read().await.tip_height() == 0 {
// We have genesis header at index 0
if let Some(genesis_header) =
self.chain_state.read().await.header_at_height(0)
{
tracing::info!(
"Initializing headers2 compression state for peer {} with genesis header",
peer_id
);
self.headers2_state.init_peer_state(peer_id, *genesis_header);
}
} else if self.chain_state.read().await.tip_height() > 0 {
// Get our current tip to use as the base for compression
if let Some(tip_header) = self.chain_state.read().await.get_tip_header() {
tracing::info!(
"Initializing headers2 compression state for peer {} with tip header at height {}",
peer_id,
self.chain_state.read().await.tip_height()
);
self.headers2_state.init_peer_state(peer_id, tip_header);
}
}
// If this is the first headers2 message, and we need to initialize compression state
if !headers2.headers.is_empty() {
// Check if we need to initialize the compression state
let state = self.headers2_state.get_state(peer_id);
if state.prev_header.is_none() {
// Initialize with header at current tip height (works for both genesis and later)
let chain_state = self.chain_state.read().await;
let tip_height = chain_state.tip_height();
if let Some(tip_header) = chain_state.header_at_height(tip_height) {
tracing::info!(
"Initializing headers2 compression state for peer {} with header at height {}",
peer_id,
tip_height
);
self.headers2_state.init_peer_state(peer_id, *tip_header);
}
}
}

// Decompress headers using the peer's compression state
let headers = match self
.headers2_state
.process_headers(peer_id, headers2.headers.clone())
{
Ok(headers) => headers,
Err(e) => {
tracing::error!(
// Decompress headers using the peer's compression state
let headers = match self.headers2_state.process_headers(peer_id, &headers2.headers) {
Ok(headers) => headers,
Err(e) => {
tracing::error!(
"Failed to decompress headers2 from peer {}: {}. Headers count: {}, first header compressed: {}, chain height: {}",
peer_id,
e,
Expand All @@ -524,37 +473,29 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
self.chain_state.read().await.tip_height()
);

// If we failed due to missing previous header, and we're at genesis,
// this might be a protocol issue where peer expects us to have genesis in compression state
if matches!(e, crate::sync::headers2::ProcessError::DecompressionError(0, _))
&& self.chain_state.read().await.tip_height() == 0
{
tracing::warn!(
"Headers2 decompression failed at genesis. Peer may be sending compressed headers that reference genesis. Consider falling back to regular headers."
);
}
// Mark that headers2 failed for this sync session to trigger fallback to regular headers
self.headers2_failed = true;
return Err(SyncError::Headers2DecompressionFailed(format!(
"Failed to decompress headers: {}",
e
)));
}
};

// Return a specific error that can trigger fallback
// Mark that headers2 failed for this sync session
self.headers2_failed = true;
return Err(SyncError::Headers2DecompressionFailed(format!(
"Failed to decompress headers: {}",
e
)));
}
};
// Log compression statistics
let stats = self.headers2_state.get_stats();
tracing::info!(
"📊 Headers2 compression stats: {:.1}% bandwidth saved, {:.1}% compression ratio",
stats.bandwidth_savings,
stats.compression_ratio * 100.0
);

// Log compression statistics
let stats = self.headers2_state.get_stats();
tracing::info!(
"📊 Headers2 compression stats: {:.1}% bandwidth saved, {:.1}% compression ratio",
stats.bandwidth_savings,
stats.compression_ratio * 100.0
);
let headers_count = headers.len();

// Process decompressed headers through the normal flow
self.handle_headers_message(&headers, _storage, _network).await
}
// Process decompressed headers through the normal flow
let continue_sync = self.handle_headers_message(&headers, storage, network).await?;

Ok((continue_sync, headers_count))
}

/// Prepare sync state without sending network requests.
Expand Down
13 changes: 6 additions & 7 deletions dash-spv/src/sync/headers2/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Headers2StateManager {
pub fn process_headers(
&mut self,
peer_id: PeerId,
headers: Vec<CompressedHeader>,
headers: &[CompressedHeader],
) -> Result<Vec<Header>, ProcessError> {
if headers.is_empty() {
return Ok(Vec::new());
Expand All @@ -116,7 +116,7 @@ impl Headers2StateManager {
let mut decompressed = Vec::with_capacity(headers.len());

// Process headers and collect statistics
for (i, compressed) in headers.into_iter().enumerate() {
for (i, compressed) in headers.iter().enumerate() {
// Update statistics
self.total_headers_received += 1;
self.total_bytes_received += compressed.encoded_size() as u64;
Expand All @@ -128,9 +128,8 @@ impl Headers2StateManager {

// Get state and decompress
let state = self.get_state(peer_id);
let header = state
.decompress(&compressed)
.map_err(|e| ProcessError::DecompressionError(i, e))?;
let header =
state.decompress(compressed).map_err(|e| ProcessError::DecompressionError(i, e))?;

decompressed.push(header);
}
Expand Down Expand Up @@ -230,7 +229,7 @@ mod tests {
let compressed2 = compress_state.compress(&header2);

// Process headers
let result = manager.process_headers(peer_id, vec![compressed1, compressed2]);
let result = manager.process_headers(peer_id, &[compressed1, compressed2]);
assert!(result.is_ok());

let decompressed = result.expect("decompression should succeed in test");
Expand Down Expand Up @@ -261,7 +260,7 @@ mod tests {

// Try to process it as first header - should fail with DecompressionError
// because the peer doesn't have the previous header state
let result = manager.process_headers(peer_id, vec![compressed]);
let result = manager.process_headers(peer_id, &[compressed]);
assert!(matches!(result, Err(ProcessError::DecompressionError(0, _))));
}

Expand Down
Loading