From 3a8b312fedd359222bc72e0ad053323358e54687 Mon Sep 17 00:00:00 2001 From: pasta Date: Sun, 16 Nov 2025 16:10:18 -0600 Subject: [PATCH 1/2] refactor: reduce needs on cs_main in network processing --- src/net_processing.cpp | 1094 +++++++++++++++++++++++----------------- 1 file changed, 637 insertions(+), 457 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9808fb9f646a7..71b8dfa5050ea 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -432,7 +432,10 @@ struct Peer { bool m_can_tx_relay GUARDED_BY(m_tx_relay_mutex) {false}; }; +struct CNodeState; + using PeerRef = std::shared_ptr; +using NodeStateRef = std::shared_ptr; /** * Maintain validation-specific state about nodes, protected by cs_main, instead @@ -441,36 +444,37 @@ using PeerRef = std::shared_ptr; * and we're no longer holding the node's locks. */ struct CNodeState { + mutable Mutex m_mutex; //! The best known block we know this peer has announced. - const CBlockIndex* pindexBestKnownBlock{nullptr}; + const CBlockIndex* pindexBestKnownBlock GUARDED_BY(m_mutex){nullptr}; //! The hash of the last unknown block this peer has announced. - uint256 hashLastUnknownBlock{}; + uint256 hashLastUnknownBlock GUARDED_BY(m_mutex){}; //! The last full block we both have. - const CBlockIndex* pindexLastCommonBlock{nullptr}; + const CBlockIndex* pindexLastCommonBlock GUARDED_BY(m_mutex){nullptr}; //! The best header we have sent our peer. - const CBlockIndex* pindexBestHeaderSent{nullptr}; + const CBlockIndex* pindexBestHeaderSent GUARDED_BY(m_mutex){nullptr}; //! Length of current-streak of unconnecting headers announcements - int nUnconnectingHeaders{0}; + int nUnconnectingHeaders GUARDED_BY(m_mutex){0}; //! Whether we've started headers synchronization with this peer. - bool fSyncStarted{false}; + bool fSyncStarted GUARDED_BY(m_mutex){false}; //! When to potentially disconnect peer for stalling headers download - std::chrono::microseconds m_headers_sync_timeout{0us}; + std::chrono::microseconds m_headers_sync_timeout GUARDED_BY(m_mutex){0us}; //! Since when we're stalling block download progress (in microseconds), or 0. - std::chrono::microseconds m_stalling_since{0us}; - std::list vBlocksInFlight; + std::chrono::microseconds m_stalling_since GUARDED_BY(m_mutex){0us}; + std::list vBlocksInFlight GUARDED_BY(m_mutex); //! When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty. - std::chrono::microseconds m_downloading_since{0us}; - int nBlocksInFlight{0}; + std::chrono::microseconds m_downloading_since GUARDED_BY(m_mutex){0us}; + int nBlocksInFlight GUARDED_BY(m_mutex){0}; //! Whether we consider this a preferred download peer. - bool fPreferredDownload{false}; + bool fPreferredDownload GUARDED_BY(m_mutex){false}; //! Whether this peer wants invs or headers (when possible) for block announcements. - bool fPreferHeaders{false}; + bool fPreferHeaders GUARDED_BY(m_mutex){false}; //! Whether this peer wants invs or compressed headers (when possible) for block announcements. - bool fPreferHeadersCompressed{false}; + bool fPreferHeadersCompressed GUARDED_BY(m_mutex){false}; /** Whether this peer wants invs or cmpctblocks (when possible) for block announcements. */ - bool m_requested_hb_cmpctblocks{false}; + bool m_requested_hb_cmpctblocks GUARDED_BY(m_mutex){false}; /** Whether this peer will send us cmpctblocks if we request them. */ - bool m_provides_cmpctblocks{false}; + std::atomic m_provides_cmpctblocks{false}; /** State used to enforce CHAIN_SYNC_TIMEOUT and EXTRA_PEER_CHECK_INTERVAL logic. * @@ -507,10 +511,10 @@ struct CNodeState { bool m_protect{false}; }; - ChainSyncTimeoutState m_chain_sync; + ChainSyncTimeoutState m_chain_sync GUARDED_BY(m_mutex); //! Time of last new block announcement - int64_t m_last_block_announcement{0}; + int64_t m_last_block_announcement GUARDED_BY(m_mutex){0}; /* * State associated with objects download. @@ -573,17 +577,22 @@ struct CNodeState { std::chrono::microseconds m_check_expiry_timer{0}; }; - ObjectDownloadState m_object_download; + ObjectDownloadState m_object_download GUARDED_BY(m_mutex); //! Whether this peer is an inbound connection const bool m_is_inbound; //! A rolling bloom filter of all announced tx CInvs to this peer. - CRollingBloomFilter m_recently_announced_invs = CRollingBloomFilter{INVENTORY_MAX_RECENT_RELAY, 0.000001}; + CRollingBloomFilter m_recently_announced_invs GUARDED_BY(m_mutex){INVENTORY_MAX_RECENT_RELAY, 0.000001}; CNodeState(bool is_inbound) : m_is_inbound(is_inbound) {} }; +// Keeps track of the time (in microseconds) when transactions were requested last time +unordered_limitedmap g_already_asked_for(MAX_INV_SZ, MAX_INV_SZ * 2); +unordered_limitedmap g_erased_object_requests(MAX_INV_SZ, MAX_INV_SZ * 2); +Mutex g_object_request_mutex; + class PeerManagerImpl final : public PeerManager { public: @@ -607,25 +616,24 @@ class PeerManagerImpl final : public PeerManager EXCLUSIVE_LOCKS_REQUIRED(!m_recent_confirmed_transactions_mutex); void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void BlockChecked(const CBlock& block, const BlockValidationState& state) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void BlockChecked(const CBlock& block, const BlockValidationState& state) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_node_states_mutex); void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr& pblock) override EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex); /** Implement NetEventsInterface */ - void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_node_states_mutex); + void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_node_states_mutex); bool ProcessMessages(CNode* pfrom, std::atomic& interrupt) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !m_node_states_mutex, !g_object_request_mutex); bool SendMessages(CNode* pto) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !g_object_request_mutex, !m_node_states_mutex); /** Implement PeerManager */ void StartScheduledTasks(CScheduler& scheduler) override; void CheckForStaleTipAndEvictPeers() override; std::optional FetchBlock(NodeId peer_id, const CBlockIndex& block_index) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_node_states_mutex); + bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_node_states_mutex); bool IgnoresIncomingTxs() override { return m_ignore_incoming_txs; } void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);; void PushInventory(NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -638,10 +646,10 @@ class PeerManagerImpl final : public PeerManager void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex); - void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override; + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !m_node_states_mutex, !g_object_request_mutex); + void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex); bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex); - size_t GetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + size_t GetRequestedObjectCount(NodeId nodeid) const override EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex); /** Implements external handlers logic */ void AddExtraHandler(std::unique_ptr&& handler) override; @@ -661,7 +669,7 @@ class PeerManagerImpl final : public PeerManager void _RelayTransaction(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex); /** Ask peers that have a transaction in their inventory to relay it to us. */ - void AskPeersForTransaction(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void AskPeersForTransaction(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !g_object_request_mutex, !m_node_states_mutex); /** Relay inventories to peers that find it relevant */ void RelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -672,16 +680,16 @@ class PeerManagerImpl final : public PeerManager */ void RelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void EraseObjectRequest(NodeId nodeid, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + void EraseObjectRequest(NodeId nodeid, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(!g_object_request_mutex, !m_node_states_mutex); void RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) - EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + EXCLUSIVE_LOCKS_REQUIRED(!g_object_request_mutex, !m_node_states_mutex); /** Helper to process result of external handlers of message */ - void PostProcessMessage(MessageProcessingResult&& ret, NodeId node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void PostProcessMessage(MessageProcessingResult&& ret, NodeId node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !g_object_request_mutex, !m_node_states_mutex); /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ - void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex); + void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex, !m_node_states_mutex); /** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */ void EvictExtraOutboundPeers(std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main); @@ -748,8 +756,7 @@ class PeerManagerImpl final : public PeerManager /** Process a single headers message from a peer. */ void ProcessHeadersMessage(CNode& pfrom, Peer& peer, const std::vector& headers, - bool via_compact_block) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex); + bool via_compact_block) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_node_states_mutex); [[nodiscard]] MessageProcessingResult ProcessPlatformBanMessage(NodeId node, std::string_view msg_type, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex); @@ -758,7 +765,7 @@ class PeerManagerImpl final : public PeerManager * occasional non-connecting header (this can happen due to BIP 130 headers * announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */ void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector& headers) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_node_states_mutex); /** Return true if the headers connect to each other, false otherwise */ bool CheckHeadersAreContinuous(const std::vector& headers) const; /** Request further headers from this peer with a given locator. @@ -768,9 +775,9 @@ class PeerManagerImpl final : public PeerManager bool MaybeSendGetHeaders(CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Potentially fetch blocks from this peer upon receipt of a new headers tip */ - void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast); + void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex); /** Update peer state based on received headers message */ - void UpdatePeerStateForReceivedHeaders(CNode& pfrom, const CBlockIndex *pindexLast, bool received_new_header, bool may_have_more_headers); + void UpdatePeerStateForReceivedHeaders(CNode& pfrom, const CBlockIndex *pindexLast, bool received_new_header, bool may_have_more_headers) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex); void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -844,20 +851,22 @@ class PeerManagerImpl final : public PeerManager */ std::map m_peer_map GUARDED_BY(m_peer_mutex); + /** Protects m_node_states. */ + mutable Mutex m_node_states_mutex; /** Map maintaining per-node state. */ - std::map m_node_states GUARDED_BY(cs_main); + std::map m_node_states GUARDED_BY(m_node_states_mutex); - /** Get a pointer to a const CNodeState, used when not mutating the CNodeState object. */ - const CNodeState* State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(cs_main); - /** Get a pointer to a mutable CNodeState. */ - CNodeState* State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Get a shared pointer to a CNodeState. */ + NodeStateRef State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex); + /** Get a shared pointer to a mutable CNodeState. */ + NodeStateRef State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex); std::atomic m_next_inv_to_inbounds{0us}; /** Check whether the last unknown block a peer advertised is not yet known. */ - void ProcessBlockAvailability(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void ProcessBlockAvailability(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex); /** Update tracking information about which blocks a peer is assumed to have. */ - void UpdateBlockAvailability(NodeId nodeid, const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void UpdateBlockAvailability(NodeId nodeid, const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex); bool CanDirectFetch() EXCLUSIVE_LOCKS_REQUIRED(cs_main); /** @@ -1027,20 +1036,20 @@ class PeerManagerImpl final : public PeerManager * - the block has been recieved from a peer * - the request for the block has timed out */ - void RemoveBlockRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void RemoveBlockRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex); /* Mark a block as in flight * Returns false, still setting pit, if the block was already in flight from the same peer * pit will only be valid as long as the same cs_main lock is being held */ - bool BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list::iterator** pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + bool BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list::iterator** pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex); bool TipMayBeStale() EXCLUSIVE_LOCKS_REQUIRED(cs_main); /** Update pindexLastCommonBlock and add not-in-flight missing successors to vBlocks, until it has * at most count entries. */ - void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex); std::map::iterator> > mapBlocksInFlight GUARDED_BY(cs_main); @@ -1048,10 +1057,10 @@ class PeerManagerImpl final : public PeerManager std::atomic m_last_tip_update{0s}; /** Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). */ - CTransactionRef FindTxForGetData(const CNode* peer, const uint256& txid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main); + CTransactionRef FindTxForGetData(const CNode* peer, const uint256& txid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex); void ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic& interruptMsgProc) - EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex, peer.m_getdata_requests_mutex) LOCKS_EXCLUDED(::cs_main); + EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex, peer.m_getdata_requests_mutex, !m_node_states_mutex) LOCKS_EXCLUDED(::cs_main); /** Process a new block. Perform any post-processing housekeeping */ void ProcessBlock(CNode& from, const std::shared_ptr& pblock, bool force_processing); @@ -1068,10 +1077,12 @@ class PeerManagerImpl final : public PeerManager * lNodesAnnouncingHeaderAndIDs, and keeping that list under a certain size by * removing the first element if necessary. */ - void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex, !m_header_ann_mutex); + /** Mutex protecting lNodesAnnouncingHeaderAndIDs. */ + mutable Mutex m_header_ann_mutex; /** Stack of nodes which we have set to announce using compact blocks */ - std::list lNodesAnnouncingHeaderAndIDs GUARDED_BY(cs_main); + std::list lNodesAnnouncingHeaderAndIDs GUARDED_BY(m_header_ann_mutex); /** Number of peers from which we're downloading blocks. */ int m_peers_downloading_from GUARDED_BY(cs_main) = 0; @@ -1091,21 +1102,24 @@ class PeerManagerImpl final : public PeerManager std::vector> m_handlers; }; -// Keeps track of the time (in microseconds) when transactions were requested last time -unordered_limitedmap g_already_asked_for(MAX_INV_SZ, MAX_INV_SZ * 2); -unordered_limitedmap g_erased_object_requests(MAX_INV_SZ, MAX_INV_SZ * 2); - -const CNodeState* PeerManagerImpl::State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(cs_main) +NodeStateRef PeerManagerImpl::State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { - std::map::const_iterator it = m_node_states.find(pnode); - if (it == m_node_states.end()) - return nullptr; - return &it->second; + LOCK(m_node_states_mutex); + auto it = m_node_states.find(pnode); + if (it == m_node_states.end()) { + return {}; + } + return it->second; } -CNodeState* PeerManagerImpl::State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +NodeStateRef PeerManagerImpl::State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { - return const_cast(std::as_const(*this).State(pnode)); + LOCK(m_node_states_mutex); + auto it = m_node_states.find(pnode); + if (it == m_node_states.end()) { + return {}; + } + return it->second; } /** @@ -1225,8 +1239,9 @@ bool PeerManagerImpl::IsBlockRequested(const uint256& hash) return mapBlocksInFlight.find(hash) != mapBlocksInFlight.end(); } -void PeerManagerImpl::RemoveBlockRequest(const uint256& hash) +void PeerManagerImpl::RemoveBlockRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex) { + AssertLockHeld(cs_main); auto it = mapBlocksInFlight.find(hash); if (it == mapBlocksInFlight.end()) { // Block was not requested @@ -1234,29 +1249,33 @@ void PeerManagerImpl::RemoveBlockRequest(const uint256& hash) } auto [node_id, list_it] = it->second; - CNodeState *state = State(node_id); + NodeStateRef state = State(node_id); assert(state != nullptr); - if (state->vBlocksInFlight.begin() == list_it) { - // First block on the queue was received, update the start download time for the next one - state->m_downloading_since = std::max(state->m_downloading_since, GetTime()); - } - state->vBlocksInFlight.erase(list_it); + { + LOCK(state->m_mutex); + if (state->vBlocksInFlight.begin() == list_it) { + // First block on the queue was received, update the start download time for the next one + state->m_downloading_since = std::max(state->m_downloading_since, GetTime()); + } + state->vBlocksInFlight.erase(list_it); - state->nBlocksInFlight--; - if (state->nBlocksInFlight == 0) { - // Last validated block on the queue was received. - m_peers_downloading_from--; + state->nBlocksInFlight--; + if (state->nBlocksInFlight == 0) { + // Last validated block on the queue was received. + m_peers_downloading_from--; + } + state->m_stalling_since = 0us; } - state->m_stalling_since = 0us; mapBlocksInFlight.erase(it); } -bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list::iterator **pit) +bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list::iterator **pit) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex) { + AssertLockHeld(cs_main); const uint256& hash{block.GetBlockHash()}; - CNodeState *state = State(nodeid); + NodeStateRef state = State(nodeid); assert(state != nullptr); // Short-circuit most stuff in case it is from the same node @@ -1271,13 +1290,17 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st // Make sure it's not listed somewhere already. RemoveBlockRequest(hash); - std::list::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), - {&block, std::unique_ptr(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)}); - state->nBlocksInFlight++; - if (state->nBlocksInFlight == 1) { - // We're starting a block download (batch) from this peer. - state->m_downloading_since = GetTime(); - m_peers_downloading_from++; + std::list::iterator it; + { + LOCK(state->m_mutex); + it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), + {&block, std::unique_ptr(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)}); + state->nBlocksInFlight++; + if (state->nBlocksInFlight == 1) { + // We're starting a block download (batch) from this peer. + state->m_downloading_since = GetTime(); + m_peers_downloading_from++; + } } itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))).first; if (pit) { @@ -1286,21 +1309,21 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st return true; } -void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) +void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex, !m_header_ann_mutex) { - AssertLockHeld(cs_main); - // When in -blocksonly mode, never request high-bandwidth mode from peers. Our // mempool will not contain the transactions necessary to reconstruct the // compact block. if (m_ignore_incoming_txs) return; - CNodeState* nodestate = State(nodeid); + NodeStateRef nodestate = State(nodeid); if (!nodestate || !nodestate->m_provides_cmpctblocks) { // Don't request compact blocks if the peer has not signalled support return; } + LOCK(m_header_ann_mutex); + int num_outbound_hb_peers = 0; for (std::list::iterator it = lNodesAnnouncingHeaderAndIDs.begin(); it != lNodesAnnouncingHeaderAndIDs.end(); it++) { if (*it == nodeid) { @@ -1308,14 +1331,14 @@ void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) lNodesAnnouncingHeaderAndIDs.push_back(nodeid); return; } - CNodeState *state = State(*it); + NodeStateRef state = State(*it); if (state != nullptr && !state->m_is_inbound) ++num_outbound_hb_peers; } if (nodestate->m_is_inbound) { // If we're adding an inbound HB peer, make sure we're not removing // our last outbound HB peer in the process. if (lNodesAnnouncingHeaderAndIDs.size() >= 3 && num_outbound_hb_peers == 1) { - CNodeState *remove_node = State(lNodesAnnouncingHeaderAndIDs.front()); + NodeStateRef remove_node = State(lNodesAnnouncingHeaderAndIDs.front()); if (remove_node != nullptr && !remove_node->m_is_inbound) { // Put the HB outbound peer in the second slot, so that it // doesn't get removed. @@ -1323,8 +1346,7 @@ void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) } } } - m_connman.ForNode(nodeid, [this](CNode* pfrom) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { - AssertLockHeld(::cs_main); + m_connman.ForNode(nodeid, [this](CNode* pfrom) EXCLUSIVE_LOCKS_REQUIRED(m_header_ann_mutex) { m_connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*high_bandwidth=*/true, /*version=*/CMPCTBLOCKS_VERSION)); // save BIP152 bandwidth state: we select peer to be high-bandwidth pfrom->m_bip152_highbandwidth_to = true; @@ -1334,7 +1356,7 @@ void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) if (lNodesAnnouncingHeaderAndIDs.size() > 3) { // As per BIP152, we only get 3 of our peers to announce // blocks using compact encodings. - m_connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [this](CNode* pnodeStop){ + m_connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [this](CNode* pnodeStop) EXCLUSIVE_LOCKS_REQUIRED(m_header_ann_mutex) { m_connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*high_bandwidth=*/false, /*version=*/CMPCTBLOCKS_VERSION)); // save BIP152 bandwidth state: we select peer to be low-bandwidth pnodeStop->m_bip152_highbandwidth_to = false; @@ -1359,19 +1381,20 @@ bool PeerManagerImpl::CanDirectFetch() return m_chainman.ActiveChain().Tip()->GetBlockTime() > GetAdjustedTime() - m_chainparams.GetConsensus().nPowTargetSpacing * 20; } -static bool PeerHasHeader(CNodeState *state, const CBlockIndex *pindex) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +static bool PeerHasHeader(const CNodeState& state, const CBlockIndex *pindex) EXCLUSIVE_LOCKS_REQUIRED(state.m_mutex) { - if (state->pindexBestKnownBlock && pindex == state->pindexBestKnownBlock->GetAncestor(pindex->nHeight)) + if (state.pindexBestKnownBlock && pindex == state.pindexBestKnownBlock->GetAncestor(pindex->nHeight)) return true; - if (state->pindexBestHeaderSent && pindex == state->pindexBestHeaderSent->GetAncestor(pindex->nHeight)) + if (state.pindexBestHeaderSent && pindex == state.pindexBestHeaderSent->GetAncestor(pindex->nHeight)) return true; return false; } -void PeerManagerImpl::ProcessBlockAvailability(NodeId nodeid) +void PeerManagerImpl::ProcessBlockAvailability(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex) { - CNodeState *state = State(nodeid); + NodeStateRef state = State(nodeid); assert(state != nullptr); + LOCK(state->m_mutex); if (!state->hashLastUnknownBlock.IsNull()) { const CBlockIndex* pindex = m_chainman.m_blockman.LookupBlockIndex(state->hashLastUnknownBlock); @@ -1384,9 +1407,9 @@ void PeerManagerImpl::ProcessBlockAvailability(NodeId nodeid) } } -void PeerManagerImpl::UpdateBlockAvailability(NodeId nodeid, const uint256 &hash) +void PeerManagerImpl::UpdateBlockAvailability(NodeId nodeid, const uint256 &hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex) { - CNodeState *state = State(nodeid); + NodeStateRef state = State(nodeid); assert(state != nullptr); ProcessBlockAvailability(nodeid); @@ -1394,27 +1417,30 @@ void PeerManagerImpl::UpdateBlockAvailability(NodeId nodeid, const uint256 &hash const CBlockIndex* pindex = m_chainman.m_blockman.LookupBlockIndex(hash); if (pindex && pindex->nChainWork > 0) { // An actually better block was announced. + LOCK(state->m_mutex); if (state->pindexBestKnownBlock == nullptr || pindex->nChainWork >= state->pindexBestKnownBlock->nChainWork) { state->pindexBestKnownBlock = pindex; } } else { // An unknown block was announced; just assume that the latest one is the best one. + LOCK(state->m_mutex); state->hashLastUnknownBlock = hash; } } -void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) +void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_node_states_mutex) { if (count == 0) return; vBlocks.reserve(vBlocks.size() + count); - CNodeState *state = State(peer.m_id); + NodeStateRef state = State(peer.m_id); assert(state != nullptr); // Make sure pindexBestKnownBlock is up to date, we'll need it. ProcessBlockAvailability(peer.m_id); + LOCK(state->m_mutex); if (state->pindexBestKnownBlock == nullptr || state->pindexBestKnownBlock->nChainWork < m_chainman.ActiveChain().Tip()->nChainWork || state->pindexBestKnownBlock->nChainWork < nMinimumChainWork) { // This peer has nothing interesting. return; @@ -1523,25 +1549,29 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) } } -void PeerManagerImpl::EraseObjectRequest(NodeId nodeid, const CInv& inv) +void PeerManagerImpl::EraseObjectRequest(NodeId nodeid, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(!g_object_request_mutex, !m_node_states_mutex) { - AssertLockHeld(cs_main); - - CNodeState* state = State(nodeid); + NodeStateRef state = State(nodeid); if (state == nullptr) return; - LogPrint(BCLog::NET, "%s -- inv=(%s)\n", __func__, inv.ToString()); - g_already_asked_for.erase(inv.hash); - g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime())); - state->m_object_download.m_object_announced.erase(inv); - state->m_object_download.m_object_in_flight.erase(inv); + { + LOCK(state->m_mutex); + state->m_object_download.m_object_announced.erase(inv); + state->m_object_download.m_object_in_flight.erase(inv); + } + + { + LOCK(g_object_request_mutex); + g_already_asked_for.erase(inv.hash); + g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime())); + } } -std::chrono::microseconds GetObjectRequestTime(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +std::chrono::microseconds GetObjectRequestTime(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(!g_object_request_mutex) { - AssertLockHeld(cs_main); + LOCK(g_object_request_mutex); auto it = g_already_asked_for.find(inv.hash); if (it != g_already_asked_for.end()) { return it->second; @@ -1549,9 +1579,9 @@ std::chrono::microseconds GetObjectRequestTime(const CInv& inv) EXCLUSIVE_LOCKS_ return {}; } -void UpdateObjectRequestTime(const CInv& inv, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void UpdateObjectRequestTime(const CInv& inv, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(!g_object_request_mutex) { - AssertLockHeld(cs_main); + LOCK(g_object_request_mutex); auto it = g_already_asked_for.find(inv.hash); if (it == g_already_asked_for.end()) { g_already_asked_for.insert(std::make_pair(inv.hash, request_time)); @@ -1589,9 +1619,8 @@ std::chrono::microseconds GetObjectRandomDelay(int invType) return {}; } -std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chrono::microseconds current_time, bool is_masternode, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chrono::microseconds current_time, bool is_masternode, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(!g_object_request_mutex) { - AssertLockHeld(cs_main); std::chrono::microseconds process_time; const auto last_request_time = GetObjectRequestTime(inv); // First time requesting this tx @@ -1609,33 +1638,36 @@ std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chron return process_time; } -void PeerManagerImpl::RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time, bool fForce) +void PeerManagerImpl::RequestObject(NodeId nodeid, const CInv& inv, std::chrono::microseconds current_time, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(!g_object_request_mutex, !m_node_states_mutex) { - AssertLockHeld(cs_main); - - CNodeState* state = State(nodeid); + NodeStateRef state = State(nodeid); if (state == nullptr) return; - CNodeState::ObjectDownloadState& peer_download_state = state->m_object_download; - if (peer_download_state.m_object_announced.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || - peer_download_state.m_object_process_time.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || - peer_download_state.m_object_announced.count(inv)) { - // Too many queued announcements from this peer, or we already have - // this announcement - return; - } - peer_download_state.m_object_announced.insert(inv); + std::chrono::microseconds process_time; + { + LOCK(state->m_mutex); + CNodeState::ObjectDownloadState& peer_download_state = state->m_object_download; + if (peer_download_state.m_object_announced.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || + peer_download_state.m_object_process_time.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || + peer_download_state.m_object_announced.count(inv)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_object_announced.insert(inv); - // Calculate the time to try requesting this transaction. Use - // fPreferredDownload as a proxy for outbound peers. - std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, /*is_masternode=*/m_mn_activeman != nullptr, + // Calculate the time to try requesting this transaction. Use + // fPreferredDownload as a proxy for outbound peers. + process_time = CalculateObjectGetDataTime(inv, current_time, /*is_masternode=*/m_mn_activeman != nullptr, !state->fPreferredDownload); - peer_download_state.m_object_process_time.emplace(process_time, inv); + peer_download_state.m_object_process_time.emplace(process_time, inv); + } if (fForce) { // make sure this object is actually requested ASAP + LOCK(g_object_request_mutex); g_erased_object_requests.erase(inv.hash); g_already_asked_for.erase(inv.hash); } @@ -1643,14 +1675,13 @@ void PeerManagerImpl::RequestObject(NodeId nodeid, const CInv& inv, std::chrono: LogPrint(BCLog::NET, "%s -- inv=(%s), current_time=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), current_time.count(), process_time.count(), (process_time - current_time).count()); } -size_t PeerManagerImpl::GetRequestedObjectCount(NodeId nodeid) const +size_t PeerManagerImpl::GetRequestedObjectCount(NodeId nodeid) const EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { - AssertLockHeld(cs_main); - - const CNodeState* state = State(nodeid); - if (state == nullptr) + NodeStateRef state = State(nodeid); + if (state == nullptr) { return 0; - + } + LOCK(state->m_mutex); return state->m_object_download.m_object_process_time.size(); } @@ -1694,18 +1725,21 @@ void PeerManagerImpl::InterruptHandlers() } } -void PeerManagerImpl::UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) +void PeerManagerImpl::UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { - LOCK(cs_main); - CNodeState *state = State(node); - if (state) state->m_last_block_announcement = time_in_seconds; + NodeStateRef state = State(node); + if (!state) { + return; + } + LOCK(state->m_mutex); + state->m_last_block_announcement = time_in_seconds; } -void PeerManagerImpl::InitializeNode(CNode& node, ServiceFlags our_services) { +void PeerManagerImpl::InitializeNode(CNode& node, ServiceFlags our_services) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { NodeId nodeid = node.GetId(); { - LOCK(cs_main); - m_node_states.emplace_hint(m_node_states.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(node.IsInboundConn())); + LOCK(m_node_states_mutex); + m_node_states.emplace_hint(m_node_states.end(), nodeid, std::make_shared(node.IsInboundConn())); } PeerRef peer = std::make_shared(nodeid, our_services); { @@ -1738,11 +1772,10 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); } -void PeerManagerImpl::FinalizeNode(const CNode& node) { +void PeerManagerImpl::FinalizeNode(const CNode& node) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { NodeId nodeid = node.GetId(); int misbehavior{0}; LOCK(cs_main); - { { // We remove the PeerRef from g_peer_map here, but we don't always // destruct the Peer. Sometimes another thread is still holding a @@ -1753,26 +1786,33 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) { assert(peer != nullptr); misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score); } - CNodeState *state = State(nodeid); + NodeStateRef state = State(nodeid); assert(state != nullptr); + { + LOCK(state->m_mutex); + if (state->fSyncStarted) + nSyncStarted--; - if (state->fSyncStarted) - nSyncStarted--; - - for (const QueuedBlock& entry : state->vBlocksInFlight) { - mapBlocksInFlight.erase(entry.pindex->GetBlockHash()); + for (const QueuedBlock& entry : state->vBlocksInFlight) { + mapBlocksInFlight.erase(entry.pindex->GetBlockHash()); + } + m_orphanage.EraseForPeer(nodeid); + if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid); + m_num_preferred_download_peers -= state->fPreferredDownload; + m_peers_downloading_from -= (state->nBlocksInFlight != 0); + assert(m_peers_downloading_from >= 0); + m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; + assert(m_outbound_peers_with_protect_from_disconnect >= 0); } - m_orphanage.EraseForPeer(nodeid); - if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid); - m_num_preferred_download_peers -= state->fPreferredDownload; - m_peers_downloading_from -= (state->nBlocksInFlight != 0); - assert(m_peers_downloading_from >= 0); - m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; - assert(m_outbound_peers_with_protect_from_disconnect >= 0); - m_node_states.erase(nodeid); + bool node_states_empty{false}; + { + LOCK(m_node_states_mutex); + m_node_states.erase(nodeid); + node_states_empty = m_node_states.empty(); + } - if (m_node_states.empty()) { + if (node_states_empty) { // Do a consistency check after the last peer is removed. assert(mapBlocksInFlight.empty()); assert(m_num_preferred_download_peers == 0); @@ -1780,7 +1820,6 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) { assert(m_outbound_peers_with_protect_from_disconnect == 0); assert(m_orphanage.Size() == 0); } - } // cs_main if (node.fSuccessfullyConnected && misbehavior == 0 && !node.IsBlockOnlyConn() && !node.IsInboundConn()) { // Only change visible addrman state for full outbound peers. We don't @@ -1811,13 +1850,14 @@ PeerRef PeerManagerImpl::RemovePeer(NodeId id) return ret; } -bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const +bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_node_states_mutex) { + auto state = State(nodeid); + if (state == nullptr) { + return false; + } { - LOCK(cs_main); - const CNodeState* state = State(nodeid); - if (state == nullptr) - return false; + LOCK(state->m_mutex); stats.nSyncHeight = state->pindexBestKnownBlock ? state->pindexBestKnownBlock->nHeight : -1; stats.nCommonHeight = state->pindexLastCommonBlock ? state->pindexLastCommonBlock->nHeight : -1; for (const QueuedBlock& queue : state->vBlocksInFlight) { @@ -1907,7 +1947,7 @@ bool PeerManagerImpl::IsBanned(NodeId pnode) } bool PeerManagerImpl::MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& state, - bool via_compact_block, const std::string& message) + bool via_compact_block, const std::string& message) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { switch (state.GetResult()) { case BlockValidationResult::BLOCK_RESULT_UNSET: @@ -1922,8 +1962,7 @@ bool PeerManagerImpl::MaybePunishNodeForBlock(NodeId nodeid, const BlockValidati break; case BlockValidationResult::BLOCK_CACHED_INVALID: { - LOCK(cs_main); - CNodeState *node_state = State(nodeid); + NodeStateRef node_state = State(nodeid); if (node_state == nullptr) { break; } @@ -1963,7 +2002,6 @@ bool PeerManagerImpl::MaybePunishNodeForTx(NodeId nodeid, const TxValidationStat // The node is providing invalid data: case TxValidationResult::TX_CONSENSUS: { - LOCK(cs_main); Misbehaving(nodeid, 100); return true; } @@ -1993,7 +2031,7 @@ bool PeerManagerImpl::BlockRequestAllowed(const CBlockIndex* pindex) (GetBlockProofEquivalentTime(*m_chainman.m_best_header, *pindex, *m_chainman.m_best_header, m_chainparams.GetConsensus()) < STALE_RELAY_AGE_LIMIT); } -std::optional PeerManagerImpl::FetchBlock(NodeId peer_id, const CBlockIndex& block_index) +std::optional PeerManagerImpl::FetchBlock(NodeId peer_id, const CBlockIndex& block_index) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_node_states_mutex) { if (fImporting) return "Importing..."; if (fReindex) return "Reindexing..."; @@ -2173,16 +2211,20 @@ void PeerManagerImpl::NewPoWValidBlock(const CBlockIndex *pindex, const std::sha if (pnode->fDisconnect) return; ProcessBlockAvailability(pnode->GetId()); - CNodeState &state = *State(pnode->GetId()); + NodeStateRef state_ref = State(pnode->GetId()); + if (!state_ref) return; // If the peer has, or we announced to them the previous block already, // but we don't think they have this one, go ahead and announce it - if (state.m_requested_hb_cmpctblocks && !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->pprev)) { - LogPrint(BCLog::NET, "%s sending header-and-ids %s to peer=%d\n", "PeerManager::NewPoWValidBlock", - hashBlock.ToString(), pnode->GetId()); - - const CSerializedNetMsg& ser_cmpctblock{lazy_ser.get()}; - m_connman.PushMessage(pnode, ser_cmpctblock.Copy()); - state.pindexBestHeaderSent = pindex; + { + LOCK(state_ref->m_mutex); + if (state_ref->m_requested_hb_cmpctblocks && !PeerHasHeader(*state_ref, pindex) && PeerHasHeader(*state_ref, pindex->pprev)) { + LogPrint(BCLog::NET, "%s sending header-and-ids %s to peer=%d\n", "PeerManager::NewPoWValidBlock", + hashBlock.ToString(), pnode->GetId()); + + const CSerializedNetMsg& ser_cmpctblock{lazy_ser.get()}; + m_connman.PushMessage(pnode, ser_cmpctblock.Copy()); + state_ref->pindexBestHeaderSent = pindex; + } } }); } @@ -2232,7 +2274,7 @@ void PeerManagerImpl::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlock * Handle invalid block rejection and consequent peer discouragement, maintain which * peers announce compact blocks. */ -void PeerManagerImpl::BlockChecked(const CBlock& block, const BlockValidationState& state) +void PeerManagerImpl::BlockChecked(const CBlock& block, const BlockValidationState& state) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex, !m_header_ann_mutex) { LOCK(cs_main); @@ -2369,7 +2411,7 @@ void PeerManagerImpl::SendPings() for(auto& it : m_peer_map) it.second->m_ping_queued = true; } -void PeerManagerImpl::AskPeersForTransaction(const uint256& txid) +void PeerManagerImpl::AskPeersForTransaction(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !g_object_request_mutex, !m_node_states_mutex) { std::vector peersToAsk; peersToAsk.reserve(4); @@ -2386,15 +2428,12 @@ void PeerManagerImpl::AskPeersForTransaction(const uint256& txid) } } } - { - CInv inv(MSG_TX, txid); - LOCK(cs_main); - for (PeerRef& peer : peersToAsk) { - LogPrintf("PeerManagerImpl::%s -- txid=%s: asking other peer %d for correct TX\n", __func__, - txid.ToString(), peer->m_id); + CInv inv(MSG_TX, txid); + for (PeerRef& peer : peersToAsk) { + LogPrintf("PeerManagerImpl::%s -- txid=%s: asking other peer %d for correct TX\n", __func__, + txid.ToString(), peer->m_id); - RequestObject(peer->m_id, inv, GetTime(), /*fForce=*/true); - } + RequestObject(peer->m_id, inv, GetTime(), /*fForce=*/true); } } @@ -2743,7 +2782,7 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv& } //! Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). -CTransactionRef PeerManagerImpl::FindTxForGetData(const CNode* peer, const uint256& txid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) +CTransactionRef PeerManagerImpl::FindTxForGetData(const CNode* peer, const uint256& txid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { auto txinfo = m_mempool.info(txid); if (txinfo.tx) { @@ -2759,19 +2798,23 @@ CTransactionRef PeerManagerImpl::FindTxForGetData(const CNode* peer, const uint2 LOCK(cs_main); // Otherwise, the transaction must have been announced recently. - if (State(peer->GetId())->m_recently_announced_invs.contains(txid)) { - // If it was, it can be relayed from either the mempool... - if (txinfo.tx) return std::move(txinfo.tx); - // ... or the relay pool. - auto mi = mapRelay.find(txid); - if (mi != mapRelay.end()) return mi->second; + NodeStateRef state = State(peer->GetId()); + if (state) { + LOCK(state->m_mutex); + if (state->m_recently_announced_invs.contains(txid)) { + // If it was, it can be relayed from either the mempool... + if (txinfo.tx) return std::move(txinfo.tx); + // ... or the relay pool. + auto mi = mapRelay.find(txid); + if (mi != mapRelay.end()) return mi->second; + } } } return {}; } -void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic& interruptMsgProc) +void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex, peer.m_getdata_requests_mutex, !m_node_states_mutex) LOCKS_EXCLUDED(::cs_main) { AssertLockNotHeld(cs_main); @@ -2846,8 +2889,11 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic for (const uint256& parent_txid : parent_ids_to_add) { // Relaying a transaction with a recent but unconfirmed parent. if (WITH_LOCK(tx_relay->m_tx_inventory_mutex, return !tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) { - LOCK(cs_main); - State(pfrom.GetId())->m_recently_announced_invs.insert(parent_txid); + NodeStateRef state = State(pfrom.GetId()); + if (state) { + LOCK(state->m_mutex); + state->m_recently_announced_invs.insert(parent_txid); + } } } } @@ -3035,32 +3081,36 @@ void PeerManagerImpl::SendBlockTransactions(CNode& pfrom, const CBlock& block, c * nUnconnectingHeaders gets reset back to 0. */ void PeerManagerImpl::HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, - const std::vector& headers) + const std::vector& headers) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_node_states_mutex) { LOCK(cs_main); - CNodeState *nodestate = State(pfrom.GetId()); - - nodestate->nUnconnectingHeaders++; - // Try to fill in the missing headers. - std::string msg_type = UsesCompressedHeaders(peer) ? NetMsgType::GETHEADERS2 : NetMsgType::GETHEADERS; - if (MaybeSendGetHeaders(pfrom, msg_type, m_chainman.ActiveChain().GetLocator(m_chainman.m_best_header), peer)) { - LogPrint(BCLog::NET, "received header %s: missing prev block %s, sending %s (%d) to end (peer=%d, nUnconnectingHeaders=%d)\n", - headers[0].GetHash().ToString(), - headers[0].hashPrevBlock.ToString(), - msg_type, - m_chainman.m_best_header->nHeight, - pfrom.GetId(), nodestate->nUnconnectingHeaders); + NodeStateRef nodestate = State(pfrom.GetId()); + if (!nodestate) return; + + { + LOCK(nodestate->m_mutex); + nodestate->nUnconnectingHeaders++; + // Try to fill in the missing headers. + std::string msg_type = UsesCompressedHeaders(peer) ? NetMsgType::GETHEADERS2 : NetMsgType::GETHEADERS; + int nUnconnectingHeaders = nodestate->nUnconnectingHeaders; + if (MaybeSendGetHeaders(pfrom, msg_type, m_chainman.ActiveChain().GetLocator(m_chainman.m_best_header), peer)) { + LogPrint(BCLog::NET, "received header %s: missing prev block %s, sending %s (%d) to end (peer=%d, nUnconnectingHeaders=%d)\n", + headers[0].GetHash().ToString(), + headers[0].hashPrevBlock.ToString(), + msg_type, + m_chainman.m_best_header->nHeight, + pfrom.GetId(), nUnconnectingHeaders); + } + // The peer may just be broken, so periodically assign DoS points if this + // condition persists. + if (nUnconnectingHeaders % MAX_UNCONNECTING_HEADERS == 0) { + Misbehaving(pfrom.GetId(), 20, strprintf("%d non-connecting headers", nUnconnectingHeaders)); + } } // Set hashLastUnknownBlock for this peer, so that if we // eventually get the headers - even from a different peer - // we can use this peer to download. UpdateBlockAvailability(pfrom.GetId(), headers.back().GetHash()); - - // The peer may just be broken, so periodically assign DoS points if this - // condition persists. - if (nodestate->nUnconnectingHeaders % MAX_UNCONNECTING_HEADERS == 0) { - Misbehaving(pfrom.GetId(), 20, strprintf("%d non-connecting headers", nodestate->nUnconnectingHeaders)); - } } bool PeerManagerImpl::CheckHeadersAreContinuous(const std::vector& headers) const @@ -3098,12 +3148,13 @@ bool PeerManagerImpl::MaybeSendGetHeaders(CNode& pfrom, const std::string& msg_t * We require that the given tip have at least as much work as our tip, and for * our current tip to be "close to synced" (see CanDirectFetch()). */ -void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast) +void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); LOCK(cs_main); - CNodeState *nodestate = State(pfrom.GetId()); + NodeStateRef nodestate = State(pfrom.GetId()); + if (!nodestate) return; if (CanDirectFetch() && pindexLast->IsValid(BLOCK_VALID_TREE) && m_chainman.ActiveChain().Tip()->nChainWork <= pindexLast->nChainWork) { @@ -3128,16 +3179,25 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c pindexLast->nHeight); } else { std::vector vGetData; + int nBlocksInFlight; + bool provides_cmpctblocks; + { + LOCK(nodestate->m_mutex); + nBlocksInFlight = nodestate->nBlocksInFlight; + } + provides_cmpctblocks = nodestate->m_provides_cmpctblocks; // Download as much as possible, from earliest to latest. for (const CBlockIndex *pindex : vToFetch | std::views::reverse) { - if (nodestate->nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { // Can't download any more from this peer break; } vGetData.push_back(CInv(MSG_BLOCK, pindex->GetBlockHash())); - BlockRequested(pfrom.GetId(), *pindex); - LogPrint(BCLog::NET, "Requesting block %s from peer=%d\n", - pindex->GetBlockHash().ToString(), pfrom.GetId()); + if (BlockRequested(pfrom.GetId(), *pindex)) { + ++nBlocksInFlight; + LogPrint(BCLog::NET, "Requesting block %s from peer=%d\n", + pindex->GetBlockHash().ToString(), pfrom.GetId()); + } } if (vGetData.size() > 1) { LogPrint(BCLog::NET, "Downloading blocks toward %s (%d) via headers direct fetch\n", @@ -3145,7 +3205,7 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c } if (vGetData.size() > 0) { if (!m_ignore_incoming_txs && - nodestate->m_provides_cmpctblocks && + provides_cmpctblocks && vGetData.size() == 1 && mapBlocksInFlight.size() == 1 && pindexLast->pprev->IsValid(BLOCK_VALID_CHAIN)) { @@ -3164,14 +3224,18 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c * update the state we keep for the peer. */ void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(CNode& pfrom, - const CBlockIndex *pindexLast, bool received_new_header, bool may_have_more_headers) + const CBlockIndex *pindexLast, bool received_new_header, bool may_have_more_headers) EXCLUSIVE_LOCKS_REQUIRED(!m_node_states_mutex) { LOCK(cs_main); - CNodeState *nodestate = State(pfrom.GetId()); - if (nodestate->nUnconnectingHeaders > 0) { - LogPrint(BCLog::NET, "peer=%d: resetting nUnconnectingHeaders (%d -> 0)\n", pfrom.GetId(), nodestate->nUnconnectingHeaders); + NodeStateRef nodestate = State(pfrom.GetId()); + if (!nodestate) return; + { + LOCK(nodestate->m_mutex); + if (nodestate->nUnconnectingHeaders > 0) { + LogPrint(BCLog::NET, "peer=%d: resetting nUnconnectingHeaders (%d -> 0)\n", pfrom.GetId(), nodestate->nUnconnectingHeaders); + } + nodestate->nUnconnectingHeaders = 0; } - nodestate->nUnconnectingHeaders = 0; assert(pindexLast); UpdateBlockAvailability(pfrom.GetId(), pindexLast->GetBlockHash()); @@ -3180,27 +3244,30 @@ void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(CNode& pfrom, // because it is set in UpdateBlockAvailability. Some nullptr checks // are still present, however, as belt-and-suspenders. - if (received_new_header && pindexLast->nChainWork > m_chainman.ActiveChain().Tip()->nChainWork) { - nodestate->m_last_block_announcement = GetTime(); - } - - // If we're in IBD, we want outbound peers that will serve us a useful - // chain. Disconnect peers that are on chains with insufficient work. - if (m_chainman.ActiveChainstate().IsInitialBlockDownload() && !may_have_more_headers) { - // If the peer has no more headers to give us, then we know we have - // their tip. - if (nodestate->pindexBestKnownBlock && nodestate->pindexBestKnownBlock->nChainWork < nMinimumChainWork) { - // This peer has too little work on their headers chain to help - // us sync -- disconnect if it is an outbound disconnection - // candidate. - // Note: We compare their tip to nMinimumChainWork (rather than - // m_chainman.ActiveChain().Tip()) because we won't start block download - // until we have a headers chain that has at least - // nMinimumChainWork, even if a peer has a chain past our tip, - // as an anti-DoS measure. - if (pfrom.IsOutboundOrBlockRelayConn()) { - LogPrintf("Disconnecting outbound peer %d -- headers chain has insufficient work\n", pfrom.GetId()); - pfrom.fDisconnect = true; + { + LOCK(nodestate->m_mutex); + if (received_new_header && pindexLast->nChainWork > m_chainman.ActiveChain().Tip()->nChainWork) { + nodestate->m_last_block_announcement = GetTime(); + } + + // If we're in IBD, we want outbound peers that will serve us a useful + // chain. Disconnect peers that are on chains with insufficient work. + if (m_chainman.ActiveChainstate().IsInitialBlockDownload() && !may_have_more_headers) { + // If the peer has no more headers to give us, then we know we have + // their tip. + if (nodestate->pindexBestKnownBlock && nodestate->pindexBestKnownBlock->nChainWork < nMinimumChainWork) { + // This peer has too little work on their headers chain to help + // us sync -- disconnect if it is an outbound disconnection + // candidate. + // Note: We compare their tip to nMinimumChainWork (rather than + // m_chainman.ActiveChain().Tip()) because we won't start block download + // until we have a headers chain that has at least + // nMinimumChainWork, even if a peer has a chain past our tip, + // as an anti-DoS measure. + if (pfrom.IsOutboundOrBlockRelayConn()) { + LogPrintf("Disconnecting outbound peer %d -- headers chain has insufficient work\n", pfrom.GetId()); + pfrom.fDisconnect = true; + } } } } @@ -3210,18 +3277,21 @@ void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(CNode& pfrom, // Note that outbound block-relay peers are excluded from this protection, and // thus always subject to eviction under the bad/lagging chain logic. // See ChainSyncTimeoutState. - if (!pfrom.fDisconnect && pfrom.IsFullOutboundConn() && nodestate->pindexBestKnownBlock != nullptr) { - if (m_outbound_peers_with_protect_from_disconnect < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT && nodestate->pindexBestKnownBlock->nChainWork >= m_chainman.ActiveChain().Tip()->nChainWork && !nodestate->m_chain_sync.m_protect) { - LogPrint(BCLog::NET, "Protecting outbound peer=%d from eviction\n", pfrom.GetId()); - nodestate->m_chain_sync.m_protect = true; - ++m_outbound_peers_with_protect_from_disconnect; + if (!pfrom.fDisconnect && pfrom.IsFullOutboundConn()) { + LOCK(nodestate->m_mutex); + if (nodestate->pindexBestKnownBlock != nullptr) { + if (m_outbound_peers_with_protect_from_disconnect < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT && nodestate->pindexBestKnownBlock->nChainWork >= m_chainman.ActiveChain().Tip()->nChainWork && !nodestate->m_chain_sync.m_protect) { + LogPrint(BCLog::NET, "Protecting outbound peer=%d from eviction\n", pfrom.GetId()); + nodestate->m_chain_sync.m_protect = true; + ++m_outbound_peers_with_protect_from_disconnect; + } } } } void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, Peer& peer, const std::vector& headers, - bool via_compact_block) + bool via_compact_block) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_node_states_mutex) { const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); size_t nCount = headers.size(); @@ -3573,13 +3643,13 @@ void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptrscore, result.m_error->message); } if (result.m_to_erase) { - WITH_LOCK(cs_main, EraseObjectRequest(node, result.m_to_erase.value())); + EraseObjectRequest(node, result.m_to_erase.value()); } for (const auto& tx : result.m_transactions) { WITH_LOCK(cs_main, _RelayTransaction(tx)); @@ -3672,7 +3742,7 @@ void PeerManagerImpl::ProcessMessage( const std::string& msg_type, CDataStream& vRecv, const std::chrono::microseconds time_received, - const std::atomic& interruptMsgProc) + const std::atomic& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !m_node_states_mutex, !g_object_request_mutex) { AssertLockHeld(g_msgproc_mutex); @@ -3854,9 +3924,12 @@ void PeerManagerImpl::ProcessMessage( // Potentially mark this peer as a preferred download peer. { LOCK(cs_main); - CNodeState* state = State(pfrom.GetId()); - state->fPreferredDownload = (!pfrom.IsInboundConn() || pfrom.HasPermission(NetPermissionFlags::NoBan)) && !pfrom.IsAddrFetchConn() && CanServeBlocks(*peer); - m_num_preferred_download_peers += state->fPreferredDownload; + NodeStateRef state = State(pfrom.GetId()); + if (state) { + LOCK(state->m_mutex); + state->fPreferredDownload = (!pfrom.IsInboundConn() || pfrom.HasPermission(NetPermissionFlags::NoBan)) && !pfrom.IsAddrFetchConn() && CanServeBlocks(*peer); + m_num_preferred_download_peers += state->fPreferredDownload; + } } // Attempt to initialize address relay for outbound peers and use result @@ -3992,13 +4065,21 @@ void PeerManagerImpl::ProcessMessage( if (msg_type == NetMsgType::SENDHEADERS) { LOCK(cs_main); - State(pfrom.GetId())->fPreferHeaders = true; + NodeStateRef state = State(pfrom.GetId()); + if (state) { + LOCK(state->m_mutex); + state->fPreferHeaders = true; + } return; } if (msg_type == NetMsgType::SENDHEADERS2) { LOCK(cs_main); - State(pfrom.GetId())->fPreferHeadersCompressed = true; + NodeStateRef state = State(pfrom.GetId()); + if (state) { + LOCK(state->m_mutex); + state->fPreferHeadersCompressed = true; + } return; } @@ -4009,10 +4090,13 @@ void PeerManagerImpl::ProcessMessage( if (sendcmpct_version != CMPCTBLOCKS_VERSION) return; - LOCK(cs_main); - CNodeState *nodestate = State(pfrom.GetId()); - nodestate->m_provides_cmpctblocks = true; - nodestate->m_requested_hb_cmpctblocks = sendcmpct_hb; + NodeStateRef nodestate = State(pfrom.GetId()); + if (!nodestate) return; + { + LOCK(nodestate->m_mutex); + nodestate->m_provides_cmpctblocks = true; + nodestate->m_requested_hb_cmpctblocks = sendcmpct_hb; + } // save whether peer selects us as BIP152 high-bandwidth peer // (receiving sendcmpct(1) signals high-bandwidth, sendcmpct(0) low-bandwidth) pfrom.m_bip152_highbandwidth_from = sendcmpct_hb; @@ -4321,15 +4405,21 @@ void PeerManagerImpl::ProcessMessage( // block to sync with as well, to sync quicker in the case where // our initial peer is unresponsive (but less bandwidth than we'd // use if we turned on sync with all peers). - CNodeState& state{*Assert(State(pfrom.GetId()))}; - if (state.fSyncStarted || (!peer->m_inv_triggered_getheaders_before_sync && *best_block != m_last_block_inv_triggering_headers_sync)) { + NodeStateRef state_ref = State(pfrom.GetId()); + if (!state_ref) return; + bool fSyncStarted; + { + LOCK(state_ref->m_mutex); + fSyncStarted = state_ref->fSyncStarted; + } + if (fSyncStarted || (!peer->m_inv_triggered_getheaders_before_sync && *best_block != m_last_block_inv_triggering_headers_sync)) { std::string msg_type = UsesCompressedHeaders(*peer) ? NetMsgType::GETHEADERS2 : NetMsgType::GETHEADERS; if (MaybeSendGetHeaders(pfrom, msg_type, m_chainman.ActiveChain().GetLocator(m_chainman.m_best_header), *peer)) { LogPrint(BCLog::NET, "%s (%d) %s to peer=%d\n", msg_type, m_chainman.m_best_header->nHeight, best_block->ToString(), pfrom.GetId()); } - if (!state.fSyncStarted) { + if (!fSyncStarted) { peer->m_inv_triggered_getheaders_before_sync = true; // Update the last block hash that triggered a new headers // sync, so that we don't turn on headers sync with more @@ -4519,7 +4609,7 @@ void PeerManagerImpl::ProcessMessage( return; } - CNodeState *nodestate = State(pfrom.GetId()); + NodeStateRef nodestate = State(pfrom.GetId()); const CBlockIndex* pindex = nullptr; if (locator.IsNull()) { @@ -4562,7 +4652,10 @@ void PeerManagerImpl::ProcessMessage( // without the new block. By resetting the BestHeaderSent, we ensure we // will re-announce the new block via headers (or compact blocks again) // in the SendMessages logic. - nodestate->pindexBestHeaderSent = pindex ? pindex : m_chainman.ActiveChain().Tip(); + { + LOCK(nodestate->m_mutex); + nodestate->pindexBestHeaderSent = pindex ? pindex : m_chainman.ActiveChain().Tip(); + } m_connman.PushMessage(&pfrom, msgMaker.Make(msg_type_internal, v_headers)); }; @@ -4609,10 +4702,7 @@ void PeerManagerImpl::ProcessMessage( AddKnownInv(*peer, txid); CInv inv(nInvType, tx.GetHash()); - { - LOCK(cs_main); - EraseObjectRequest(pfrom.GetId(), inv); - } + EraseObjectRequest(pfrom.GetId(), inv); // Process custom logic, no matter if tx will be accepted to mempool later or not if (nInvType == MSG_DSTX) { @@ -4814,12 +4904,16 @@ void PeerManagerImpl::ProcessMessage( assert(pindex); UpdateBlockAvailability(pfrom.GetId(), pindex->GetBlockHash()); - CNodeState *nodestate = State(pfrom.GetId()); + NodeStateRef nodestate = State(pfrom.GetId()); + if (!nodestate) return; // If this was a new header with more work than our tip, update the // peer's last block announcement time - if (received_new_header && pindex->nChainWork > m_chainman.ActiveChain().Tip()->nChainWork) { - nodestate->m_last_block_announcement = GetTime(); + { + LOCK(nodestate->m_mutex); + if (received_new_header && pindex->nChainWork > m_chainman.ActiveChain().Tip()->nChainWork) { + nodestate->m_last_block_announcement = GetTime(); + } } std::map::iterator> >::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash()); @@ -4847,7 +4941,12 @@ void PeerManagerImpl::ProcessMessage( // We want to be a bit conservative just to be extra careful about DoS // possibilities in compact block processing... if (pindex->nHeight <= m_chainman.ActiveChain().Height() + 2) { - if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || + int nBlocksInFlight; + { + LOCK(nodestate->m_mutex); + nBlocksInFlight = nodestate->nBlocksInFlight; + } + if ((!fAlreadyInFlight && nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || (fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) { std::list::iterator *queuedBlockIt = nullptr; if (!BlockRequested(pfrom.GetId(), *pindex, &queuedBlockIt)) { @@ -5385,11 +5484,12 @@ void PeerManagerImpl::ProcessMessage( } if (msg_type == NetMsgType::NOTFOUND) { // Remove the NOTFOUND transactions from the peer - LOCK(cs_main); - CNodeState *state = State(pfrom.GetId()); + NodeStateRef state = State(pfrom.GetId()); + if (!state) return; std::vector vInv; vRecv >> vInv; if (vInv.size() <= MAX_PEER_OBJECT_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + LOCK(state->m_mutex); for (CInv &inv : vInv) { if (inv.IsKnownType()) { // If we receive a NOTFOUND message for a txid we requested, erase @@ -5442,7 +5542,7 @@ void PeerManagerImpl::ProcessMessage( chainlock::ChainLockSig clsig; vRecv >> clsig; const uint256& hash = ::SerializeHash(clsig); - WITH_LOCK(::cs_main, EraseObjectRequest(pfrom.GetId(), CInv{MSG_CLSIG, hash})); + EraseObjectRequest(pfrom.GetId(), CInv{MSG_CLSIG, hash}); PostProcessMessage(m_llmq_ctx->clhandler->ProcessNewChainLock(pfrom.GetId(), clsig, hash), pfrom.GetId()); } return; // CLSIG @@ -5499,7 +5599,7 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) return true; } -bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interruptMsgProc) +bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !m_node_states_mutex, !g_object_request_mutex) { AssertLockHeld(g_msgproc_mutex); @@ -5574,12 +5674,15 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt return fMoreWork; } -void PeerManagerImpl::ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) +void PeerManagerImpl::ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex, !m_node_states_mutex) { AssertLockHeld(cs_main); - CNodeState &state = *State(pto.GetId()); + NodeStateRef state_ref = State(pto.GetId()); + if (!state_ref) return; + CNodeState &state = *state_ref; + LOCK(state.m_mutex); if (!state.m_chain_sync.m_protect && pto.IsOutboundOrBlockRelayConn() && state.fSyncStarted) { // This is an outbound peer subject to disconnection if they don't // announce a block with as much work as the current tip within @@ -5664,16 +5767,26 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now) // at all. // Note that we only request blocks from a peer if we learn of a // valid headers chain with at least as much work as our tip. - CNodeState *node_state = State(pnode->GetId()); - if (node_state == nullptr || - (now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->nBlocksInFlight == 0)) { + NodeStateRef node_state = State(pnode->GetId()); + if (node_state == nullptr) { + pnode->fDisconnect = true; + LogPrint(BCLog::NET, "disconnecting extra block-relay-only peer=%d (last block received at time %d)\n", + pnode->GetId(), count_seconds(pnode->m_last_block_time)); + return true; + } + int nBlocksInFlight; + { + LOCK(node_state->m_mutex); + nBlocksInFlight = node_state->nBlocksInFlight; + } + if (now - pnode->m_connected >= MINIMUM_CONNECT_TIME && nBlocksInFlight == 0) { pnode->fDisconnect = true; LogPrint(BCLog::NET, "disconnecting extra block-relay-only peer=%d (last block received at time %d)\n", pnode->GetId(), count_seconds(pnode->m_last_block_time)); return true; } else { LogPrint(BCLog::NET, "keeping block-relay-only peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n", - pnode->GetId(), count_seconds(pnode->m_connected), node_state->nBlocksInFlight); + pnode->GetId(), count_seconds(pnode->m_connected), nBlocksInFlight); } return false; }); @@ -5702,16 +5815,23 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now) // Only consider outbound-full-relay peers that are not already // marked for disconnection if (!pnode->IsFullOutboundConn() || pnode->fDisconnect) return; - CNodeState *state = State(pnode->GetId()); + NodeStateRef state = State(pnode->GetId()); if (state == nullptr) return; // shouldn't be possible, but just in case // Don't evict our protected peers - if (state->m_chain_sync.m_protect) return; + bool m_protect; + int64_t m_last_block_announcement; + { + LOCK(state->m_mutex); + m_protect = state->m_chain_sync.m_protect; + m_last_block_announcement = state->m_last_block_announcement; + } + if (m_protect) return; // If this is the only connection on a particular network that is // OUTBOUND_FULL_RELAY or MANUAL, protect it. if (!m_connman.MultipleManualOrFullOutboundConns(pnode->addr.GetNetwork())) return; - if (state->m_last_block_announcement < oldest_block_announcement || (state->m_last_block_announcement == oldest_block_announcement && pnode->GetId() > worst_peer)) { + if (m_last_block_announcement < oldest_block_announcement || (m_last_block_announcement == oldest_block_announcement && pnode->GetId() > worst_peer)) { worst_peer = pnode->GetId(); - oldest_block_announcement = state->m_last_block_announcement; + oldest_block_announcement = m_last_block_announcement; } }); if (worst_peer != -1) { @@ -5723,14 +5843,20 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now) // it time for new information to have arrived. // Also don't disconnect any peer we're trying to download a // block from. - CNodeState &state = *State(pnode->GetId()); - if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.nBlocksInFlight == 0) { + NodeStateRef state_ref = State(pnode->GetId()); + if (!state_ref) return false; + int nBlocksInFlight; + { + LOCK(state_ref->m_mutex); + nBlocksInFlight = state_ref->nBlocksInFlight; + } + if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && nBlocksInFlight == 0) { LogPrint(BCLog::NET, "disconnecting extra outbound peer=%d (last block announcement received at time %d)\n", pnode->GetId(), oldest_block_announcement); pnode->fDisconnect = true; return true; } else { LogPrint(BCLog::NET, "keeping outbound peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n", - pnode->GetId(), count_seconds(pnode->m_connected), state.nBlocksInFlight); + pnode->GetId(), count_seconds(pnode->m_connected), nBlocksInFlight); return false; } }); @@ -5925,7 +6051,7 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer) return true; } -bool PeerManagerImpl::SendMessages(CNode* pto) +bool PeerManagerImpl::SendMessages(CNode* pto) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !g_object_request_mutex, !m_node_states_mutex) { AssertLockHeld(g_msgproc_mutex); @@ -5966,7 +6092,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto) { LOCK(cs_main); - CNodeState &state = *State(pto->GetId()); + NodeStateRef state_ref = State(pto->GetId()); + if (!state_ref) return true; + CNodeState &state = *state_ref; // Start block sync if (m_chainman.m_best_header == nullptr) { @@ -5977,7 +6105,14 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // block download from this peer -- this mostly affects behavior while // in IBD (once out of IBD, we sync from all peers). bool sync_blocks_and_headers_from_peer = false; - if (state.fPreferredDownload) { + bool fPreferredDownload; + bool fSyncStarted; + { + LOCK(state.m_mutex); + fPreferredDownload = state.fPreferredDownload; + fSyncStarted = state.fSyncStarted; + } + if (fPreferredDownload) { sync_blocks_and_headers_from_peer = true; } else if (CanServeBlocks(*peer) && !pto->IsAddrFetchConn()) { // Typically this is an inbound peer. If we don't have any outbound @@ -5994,7 +6129,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } } - if (!state.fSyncStarted && CanServeBlocks(*peer) && !fImporting && !fReindex && pto->CanRelay()) { + if (!fSyncStarted && CanServeBlocks(*peer) && !fImporting && !fReindex && pto->CanRelay()) { // Only actively request headers from a single peer, unless we're close to end of initial download. if ((nSyncStarted == 0 && sync_blocks_and_headers_from_peer) || m_chainman.m_best_header->GetBlockTime() > GetAdjustedTime() - nMaxTipAge) { const CBlockIndex* pindexStart = m_chainman.m_best_header; @@ -6011,14 +6146,17 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (MaybeSendGetHeaders(*pto, msg_type, m_chainman.ActiveChain().GetLocator(pindexStart), *peer)) { LogPrint(BCLog::NET, "initial %s (%d) to peer=%d (startheight:%d)\n", msg_type, pindexStart->nHeight, pto->GetId(), peer->m_starting_height); - state.fSyncStarted = true; - state.m_headers_sync_timeout = current_time + HEADERS_DOWNLOAD_TIMEOUT_BASE + + { + LOCK(state.m_mutex); + state.fSyncStarted = true; + state.m_headers_sync_timeout = current_time + HEADERS_DOWNLOAD_TIMEOUT_BASE + ( // Convert HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER to microseconds before scaling // to maintain precision std::chrono::microseconds{HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER} * (GetAdjustedTime() - m_chainman.m_best_header->GetBlockTime()) / consensusParams.nPowTargetSpacing ); + } nSyncStarted++; } } @@ -6037,69 +6175,77 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // add all to the inv queue. LOCK(peer->m_block_inv_mutex); std::vector vHeaders; - bool fRevertToInv = ((!state.fPreferHeaders && !state.fPreferHeadersCompressed && - (!state.m_requested_hb_cmpctblocks || peer->m_blocks_for_headers_relay.size() > 1)) || - peer->m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE); + bool fRevertToInv; const CBlockIndex *pBestIndex = nullptr; // last header queued for delivery ProcessBlockAvailability(pto->GetId()); // ensure pindexBestKnownBlock is up-to-date + NodeStateRef state_ref = State(pto->GetId()); + if (!state_ref) return true; + CNodeState &state = *state_ref; + { + LOCK(state.m_mutex); + fRevertToInv = ((!state.fPreferHeaders && !state.fPreferHeadersCompressed && + (!state.m_requested_hb_cmpctblocks || peer->m_blocks_for_headers_relay.size() > 1)) || + peer->m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE); - if (!fRevertToInv) { - bool fFoundStartingHeader = false; - // Try to find first header that our peer doesn't have, and - // then send all headers past that one. If we come across any - // headers that aren't on m_chainman.ActiveChain(), give up. - for (const uint256& hash : peer->m_blocks_for_headers_relay) { - const CBlockIndex* pindex = m_chainman.m_blockman.LookupBlockIndex(hash); - assert(pindex); - if (m_chainman.ActiveChain()[pindex->nHeight] != pindex) { - // Bail out if we reorged away from this block - fRevertToInv = true; - break; - } - if (pBestIndex != nullptr && pindex->pprev != pBestIndex) { - // This means that the list of blocks to announce don't - // connect to each other. - // This shouldn't really be possible to hit during - // regular operation (because reorgs should take us to - // a chain that has some block not on the prior chain, - // which should be caught by the prior check), but one - // way this could happen is by using invalidateblock / - // reconsiderblock repeatedly on the tip, causing it to - // be added multiple times to m_blocks_for_headers_relay. - // Robustly deal with this rare situation by reverting - // to an inv. - fRevertToInv = true; - break; - } - pBestIndex = pindex; - bool isPrevDevnetGenesisBlock = false; - if (!consensusParams.hashDevnetGenesisBlock.IsNull() && - pindex->pprev != nullptr && - pindex->pprev->GetBlockHash() == consensusParams.hashDevnetGenesisBlock) { - // even though the devnet genesis block was never transferred through the wire and thus not - // appear anywhere in the node state where we track what other nodes have or not have, we can - // assume that the other node already knows the devnet genesis block - isPrevDevnetGenesisBlock = true; - } - if (fFoundStartingHeader) { - // add this to the headers message - vHeaders.push_back(pindex->GetBlockHeader()); - } else if (PeerHasHeader(&state, pindex)) { - continue; // keep looking for the first new block - } else if (pindex->pprev == nullptr || PeerHasHeader(&state, pindex->pprev) || isPrevDevnetGenesisBlock) { - // Peer doesn't have this header but they do have the prior one. - // Start sending headers. - fFoundStartingHeader = true; - vHeaders.push_back(pindex->GetBlockHeader()); - } else { - // Peer doesn't have this header or the prior one -- nothing will - // connect, so bail out. - fRevertToInv = true; - break; + if (!fRevertToInv) { + bool fFoundStartingHeader = false; + // Try to find first header that our peer doesn't have, and + // then send all headers past that one. If we come across any + // headers that aren't on m_chainman.ActiveChain(), give up. + for (const uint256& hash : peer->m_blocks_for_headers_relay) { + const CBlockIndex* pindex = m_chainman.m_blockman.LookupBlockIndex(hash); + assert(pindex); + if (m_chainman.ActiveChain()[pindex->nHeight] != pindex) { + // Bail out if we reorged away from this block + fRevertToInv = true; + break; + } + if (pBestIndex != nullptr && pindex->pprev != pBestIndex) { + // This means that the list of blocks to announce don't + // connect to each other. + // This shouldn't really be possible to hit during + // regular operation (because reorgs should take us to + // a chain that has some block not on the prior chain, + // which should be caught by the prior check), but one + // way this could happen is by using invalidateblock / + // reconsiderblock repeatedly on the tip, causing it to + // be added multiple times to m_blocks_for_headers_relay. + // Robustly deal with this rare situation by reverting + // to an inv. + fRevertToInv = true; + break; + } + pBestIndex = pindex; + bool isPrevDevnetGenesisBlock = false; + if (!consensusParams.hashDevnetGenesisBlock.IsNull() && + pindex->pprev != nullptr && + pindex->pprev->GetBlockHash() == consensusParams.hashDevnetGenesisBlock) { + // even though the devnet genesis block was never transferred through the wire and thus not + // appear anywhere in the node state where we track what other nodes have or not have, we can + // assume that the other node already knows the devnet genesis block + isPrevDevnetGenesisBlock = true; + } + if (fFoundStartingHeader) { + // add this to the headers message + vHeaders.push_back(pindex->GetBlockHeader()); + } else if (PeerHasHeader(state, pindex)) { + continue; // keep looking for the first new block + } else if (pindex->pprev == nullptr || PeerHasHeader(state, pindex->pprev) || isPrevDevnetGenesisBlock) { + // Peer doesn't have this header but they do have the prior one. + // Start sending headers. + fFoundStartingHeader = true; + vHeaders.push_back(pindex->GetBlockHeader()); + } else { + // Peer doesn't have this header or the prior one -- nothing will + // connect, so bail out. + fRevertToInv = true; + break; + } } } } if (!fRevertToInv && !vHeaders.empty()) { + LOCK(state.m_mutex); if (vHeaders.size() == 1 && state.m_requested_hb_cmpctblocks) { // We only send up to 1 block as header-and-ids, as otherwise // probably means we're doing an initial-ish-sync or they're slow @@ -6170,10 +6316,13 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } // If the peer's chain has this block, don't inv it back. - if (!PeerHasHeader(&state, pindex)) { - peer->m_blocks_for_inv_relay.push_back(hashToAnnounce); - LogPrint(BCLog::NET, "%s: sending inv peer=%d hash=%s\n", __func__, - pto->GetId(), hashToAnnounce.ToString()); + { + LOCK(state.m_mutex); + if (!PeerHasHeader(state, pindex)) { + peer->m_blocks_for_inv_relay.push_back(hashToAnnounce); + LogPrint(BCLog::NET, "%s: sending inv peer=%d hash=%s\n", __func__, + pto->GetId(), hashToAnnounce.ToString()); + } } } } @@ -6319,7 +6468,11 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue; // Send - State(pto->GetId())->m_recently_announced_invs.insert(hash); + NodeStateRef state = State(pto->GetId()); + if (state) { + LOCK(state->m_mutex); + state->m_recently_announced_invs.insert(hash); + } nRelayedTransactions++; { // Expire old relay messages @@ -6368,39 +6521,44 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Detect whether we're stalling auto stalling_timeout = m_block_stalling_timeout.load(); - if (state.m_stalling_since.count() && state.m_stalling_since < current_time - stalling_timeout) { - // Stalling only triggers when the block download window cannot move. During normal steady state, - // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection - // should only happen during initial block download. - LogPrintf("Peer=%d%s is stalling block download, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); - pto->fDisconnect = true; - // Increase timeout for the next peer so that we don't disconnect multiple peers if our own - // bandwidth is insufficient. - const auto new_timeout = std::min(2 * stalling_timeout, BLOCK_STALLING_TIMEOUT_MAX); - if (stalling_timeout != new_timeout && m_block_stalling_timeout.compare_exchange_strong(stalling_timeout, new_timeout)) { - LogPrint(BCLog::NET, "Increased stalling timeout temporarily to %d seconds\n", count_seconds(new_timeout)); - } - return true; - } - // In case there is a block that has been in flight from this peer for block_interval * (1 + 0.5 * N) - // (with N the number of peers from which we're downloading validated blocks), disconnect due to timeout. - // We compensate for other peers to prevent killing off peers due to our own downstream link - // being saturated. We only count validated in-flight blocks so peers can't advertise non-existing block hashes - // to unreasonably increase our timeout. - if (state.vBlocksInFlight.size() > 0) { - QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); - int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1; - if (current_time > state.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { - LogPrintf("Timeout downloading block %s from peer=%d%s, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); + { + LOCK(state.m_mutex); + if (state.m_stalling_since.count() && state.m_stalling_since < current_time - stalling_timeout) { + // Stalling only triggers when the block download window cannot move. During normal steady state, + // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection + // should only happen during initial block download. + LogPrintf("Peer=%d%s is stalling block download, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); pto->fDisconnect = true; + // Increase timeout for the next peer so that we don't disconnect multiple peers if our own + // bandwidth is insufficient. + const auto new_timeout = std::min(2 * stalling_timeout, BLOCK_STALLING_TIMEOUT_MAX); + if (stalling_timeout != new_timeout && m_block_stalling_timeout.compare_exchange_strong(stalling_timeout, new_timeout)) { + LogPrint(BCLog::NET, "Increased stalling timeout temporarily to %d seconds\n", count_seconds(new_timeout)); + } return true; } + // In case there is a block that has been in flight from this peer for block_interval * (1 + 0.5 * N) + // (with N the number of peers from which we're downloading validated blocks), disconnect due to timeout. + // We compensate for other peers to prevent killing off peers due to our own downstream link + // being saturated. We only count validated in-flight blocks so peers can't advertise non-existing block hashes + // to unreasonably increase our timeout. + if (state.vBlocksInFlight.size() > 0) { + QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); + int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1; + if (current_time > state.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { + LogPrintf("Timeout downloading block %s from peer=%d%s, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : ""); + pto->fDisconnect = true; + return true; + } + } } // Check for headers sync timeouts - if (state.fSyncStarted && state.m_headers_sync_timeout < std::chrono::microseconds::max()) { - // Detect whether this is a stalling initial-headers-sync peer - if (m_chainman.m_best_header->GetBlockTime() <= GetAdjustedTime() - nMaxTipAge) { - if (current_time > state.m_headers_sync_timeout && nSyncStarted == 1 && (m_num_preferred_download_peers - state.fPreferredDownload >= 1)) { + { + LOCK(state.m_mutex); + if (state.fSyncStarted && state.m_headers_sync_timeout < std::chrono::microseconds::max()) { + // Detect whether this is a stalling initial-headers-sync peer + if (m_chainman.m_best_header->GetBlockTime() <= GetAdjustedTime() - nMaxTipAge) { + if (current_time > state.m_headers_sync_timeout && nSyncStarted == 1 && (m_num_preferred_download_peers - state.fPreferredDownload >= 1)) { // Disconnect a peer (without NetPermissionFlags::NoBan permission) if it is our only sync peer, // and we have others we could be using instead. // Note: If all our peers are inbound, then we won't @@ -6427,6 +6585,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // disconnect later. state.m_headers_sync_timeout = std::chrono::microseconds::max(); } + } } // Check that outbound peers have reasonable chains @@ -6437,20 +6596,33 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Message: getdata (blocks) // std::vector vGetData; - if (CanServeBlocks(*peer) && pto->CanRelay() && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + int nBlocksInFlight; + { + LOCK(state.m_mutex); + nBlocksInFlight = state.nBlocksInFlight; + } + if (CanServeBlocks(*peer) && pto->CanRelay() && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { std::vector vToDownload; NodeId staller = -1; - FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller); + FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - nBlocksInFlight, vToDownload, staller); for (const CBlockIndex *pindex : vToDownload) { vGetData.push_back(CInv(MSG_BLOCK, pindex->GetBlockHash())); BlockRequested(pto->GetId(), *pindex); LogPrint(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), pindex->nHeight, pto->GetId()); } - if (state.nBlocksInFlight == 0 && staller != -1) { - if (State(staller)->m_stalling_since == 0us) { - State(staller)->m_stalling_since = current_time; - LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); + { + LOCK(state.m_mutex); + nBlocksInFlight = state.nBlocksInFlight; + } + if (nBlocksInFlight == 0 && staller != -1) { + NodeStateRef staller_state = State(staller); + if (staller_state) { + LOCK(staller_state->m_mutex); + if (staller_state->m_stalling_since == 0us) { + staller_state->m_stalling_since = current_time; + LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); + } } } } @@ -6464,61 +6636,69 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // were unresponsive in the past. // Eventually we should consider disconnecting peers, but this is // conservative. - if (state.m_object_download.m_check_expiry_timer <= current_time) { - for (auto it=state.m_object_download.m_object_in_flight.begin(); it != state.m_object_download.m_object_in_flight.end();) { - if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) { - LogPrint(BCLog::NET, "timeout of inflight object %s from peer=%d\n", it->first.ToString(), pto->GetId()); - state.m_object_download.m_object_announced.erase(it->first); - state.m_object_download.m_object_in_flight.erase(it++); - } else { - ++it; + { + LOCK(state.m_mutex); + if (state.m_object_download.m_check_expiry_timer <= current_time) { + for (auto it=state.m_object_download.m_object_in_flight.begin(); it != state.m_object_download.m_object_in_flight.end();) { + if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) { + LogPrint(BCLog::NET, "timeout of inflight object %s from peer=%d\n", it->first.ToString(), pto->GetId()); + state.m_object_download.m_object_announced.erase(it->first); + state.m_object_download.m_object_in_flight.erase(it++); + } else { + ++it; + } } + // On average, we do this check every GetObjectExpiryInterval. Randomize + // so that we're not doing this for all peers at the same time. + state.m_object_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX)/2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX)); } - // On average, we do this check every GetObjectExpiryInterval. Randomize - // so that we're not doing this for all peers at the same time. - state.m_object_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX)/2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX)); - } - - // DASH this code also handles non-TXs (Dash specific messages) - auto& object_process_time = state.m_object_download.m_object_process_time; - while (!object_process_time.empty() && object_process_time.begin()->first <= current_time && state.m_object_download.m_object_in_flight.size() < MAX_PEER_OBJECT_IN_FLIGHT) { - const CInv inv = object_process_time.begin()->second; - // Erase this entry from object_process_time (it may be added back for - // processing at a later time, see below) - object_process_time.erase(object_process_time.begin()); - if (g_erased_object_requests.count(inv.hash)) { - LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); - state.m_object_download.m_object_announced.erase(inv); - state.m_object_download.m_object_in_flight.erase(inv); - continue; - } - if (!AlreadyHave(inv)) { - // If this object was last requested more than GetObjectInterval ago, - // then request. - const auto last_request_time = GetObjectRequestTime(inv); - if (last_request_time <= current_time - GetObjectInterval(inv.type)) { - LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= MAX_GETDATA_SZ) { - m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); + + // DASH this code also handles non-TXs (Dash specific messages) + auto& object_process_time = state.m_object_download.m_object_process_time; + while (!object_process_time.empty() && object_process_time.begin()->first <= current_time && state.m_object_download.m_object_in_flight.size() < MAX_PEER_OBJECT_IN_FLIGHT) { + const CInv inv = object_process_time.begin()->second; + // Erase this entry from object_process_time (it may be added back for + // processing at a later time, see below) + object_process_time.erase(object_process_time.begin()); + bool skip_request{false}; + { + LOCK(g_object_request_mutex); + skip_request = g_erased_object_requests.count(inv.hash) != 0; + } + if (skip_request) { + LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); + state.m_object_download.m_object_announced.erase(inv); + state.m_object_download.m_object_in_flight.erase(inv); + continue; + } + if (!AlreadyHave(inv)) { + // If this object was last requested more than GetObjectInterval ago, + // then request. + const auto last_request_time = GetObjectRequestTime(inv); + if (last_request_time <= current_time - GetObjectInterval(inv.type)) { + LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); + vGetData.push_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); + } + UpdateObjectRequestTime(inv, current_time); + state.m_object_download.m_object_in_flight.emplace(inv, current_time); + } else { + // This object is in flight from someone else; queue + // up processing to happen after the download times out + // (with a slight delay for inbound peers, to prefer + // requests to outbound peers). + const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, is_masternode, !state.fPreferredDownload); + object_process_time.emplace(next_process_time, inv); + LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId()); } - UpdateObjectRequestTime(inv, current_time); - state.m_object_download.m_object_in_flight.emplace(inv, current_time); } else { - // This object is in flight from someone else; queue - // up processing to happen after the download times out - // (with a slight delay for inbound peers, to prefer - // requests to outbound peers). - const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, is_masternode, !state.fPreferredDownload); - object_process_time.emplace(next_process_time, inv); - LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId()); + // We have already seen this object, no need to download. + state.m_object_download.m_object_announced.erase(inv); + state.m_object_download.m_object_in_flight.erase(inv); + LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); } - } else { - // We have already seen this object, no need to download. - state.m_object_download.m_object_announced.erase(inv); - state.m_object_download.m_object_in_flight.erase(inv); - LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); } } From 12f5253df1a2b105a403e2fb02e7821836b3cfc9 Mon Sep 17 00:00:00 2001 From: pasta Date: Sat, 22 Nov 2025 19:02:59 -0600 Subject: [PATCH 2/2] fix: resolve remaining thread safety issues --- src/net_processing.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 71b8dfa5050ea..4aba4323c97fa 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -660,11 +660,11 @@ class PeerManagerImpl final : public PeerManager /** Implement PeerManagerInternal */ void PeerMisbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void PeerEraseObjectRequest(const NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + void PeerEraseObjectRequest(const NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main, !g_object_request_mutex, !m_node_states_mutex); void PeerRelayInv(const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void PeerAskPeersForTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void PeerAskPeersForTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !g_object_request_mutex, !m_node_states_mutex); private: void _RelayTransaction(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex);