Skip to content

Commit 3e2af6c

Browse files
committed
syncing
1 parent cc39378 commit 3e2af6c

File tree

9 files changed

+293
-63
lines changed

9 files changed

+293
-63
lines changed

Blockchain/Sources/Blockchain/Blockchain.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {
2929
public func importBlock(_ block: BlockRef) async throws {
3030
logger.debug("importing block: #\(block.header.timeslot) \(block.hash)")
3131

32+
if try await dataProvider.hasBlock(hash: block.hash) {
33+
logger.debug("block already imported", metadata: ["hash": "\(block.hash)"])
34+
return
35+
}
36+
3237
try await withSpan("importBlock") { span in
3338
span.attributes.blockHash = block.hash.description
3439

Blockchain/Sources/Blockchain/Types/Header.swift

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,35 @@ extension Header {
133133
}
134134
}
135135

136-
public typealias HeaderRef = Ref<Header>
136+
public final class HeaderRef: Ref<Header>, @unchecked Sendable {
137+
public required init(_ value: Header) {
138+
lazyHash = Lazy {
139+
Ref(value.hash())
140+
}
141+
142+
super.init(value)
143+
}
144+
145+
private let lazyHash: Lazy<Ref<Data32>>
146+
147+
public var hash: Data32 {
148+
lazyHash.value.value
149+
}
150+
151+
override public var description: String {
152+
"Header(hash: \(hash), timeslot: \(value.timeslot))"
153+
}
154+
}
155+
156+
extension HeaderRef: Codable {
157+
public convenience init(from decoder: Decoder) throws {
158+
try self.init(.init(from: decoder))
159+
}
160+
161+
public func encode(to encoder: Encoder) throws {
162+
try value.encode(to: encoder)
163+
}
164+
}
137165

138166
extension Header.Unsigned: Dummy {
139167
public typealias Config = ProtocolConfigRef
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import Utils
2+
3+
public struct BlockRequest: Codable, Sendable {
4+
public enum Direction: UInt8, Codable, Sendable {
5+
case ascendingExcludsive = 0
6+
case descendingInclusive = 1
7+
}
8+
9+
public var hash: Data32
10+
public var direction: Direction
11+
public var maxBlocks: UInt32
12+
}

Node/Sources/Node/NetworkingProtocol/CommonEphemeral/CERequest.swift

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import Foundation
44
import Networking
55

66
public enum CERequest: Sendable {
7+
case blockRequest(BlockRequest)
78
case safroleTicket1(SafroleTicketMessage)
89
case safroleTicket2(SafroleTicketMessage)
910
}
@@ -13,6 +14,8 @@ extension CERequest: RequestProtocol {
1314

1415
public func encode() throws -> Data {
1516
switch self {
17+
case let .blockRequest(message):
18+
try JamEncoder.encode(message)
1619
case let .safroleTicket1(message):
1720
try JamEncoder.encode(message)
1821
case let .safroleTicket2(message):
@@ -22,6 +25,8 @@ extension CERequest: RequestProtocol {
2225

2326
public var kind: CommonEphemeralStreamKind {
2427
switch self {
28+
case .blockRequest:
29+
.blockRequest
2530
case .safroleTicket1:
2631
.safroleTicket1
2732
case .safroleTicket2:
@@ -31,6 +36,8 @@ extension CERequest: RequestProtocol {
3136

3237
static func getType(kind: CommonEphemeralStreamKind) -> Decodable.Type {
3338
switch kind {
39+
case .blockRequest:
40+
BlockRequest.self
3441
case .safroleTicket1:
3542
SafroleTicketMessage.self
3643
case .safroleTicket2:
@@ -42,6 +49,11 @@ extension CERequest: RequestProtocol {
4249

4350
static func from(kind: CommonEphemeralStreamKind, data: any Decodable) -> CERequest? {
4451
switch kind {
52+
case .blockRequest:
53+
guard let message = data as? BlockRequest else {
54+
return nil
55+
}
56+
return .blockRequest(message)
4557
case .safroleTicket1:
4658
guard let message = data as? SafroleTicketMessage else {
4759
return nil
@@ -57,31 +69,3 @@ extension CERequest: RequestProtocol {
5769
}
5870
}
5971
}
60-
61-
extension CERequest {
62-
public func handle(blockchain: Blockchain) async throws -> (any Encodable)? {
63-
switch self {
64-
case let .safroleTicket1(message):
65-
blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived(
66-
items: [
67-
ExtrinsicTickets.TicketItem(
68-
attempt: message.attempt,
69-
signature: message.proof
70-
),
71-
]
72-
))
73-
// TODO: rebroadcast to other peers after some time
74-
return nil
75-
case let .safroleTicket2(message):
76-
blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived(
77-
items: [
78-
ExtrinsicTickets.TicketItem(
79-
attempt: message.attempt,
80-
signature: message.proof
81-
),
82-
]
83-
))
84-
return nil
85-
}
86-
}
87-
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import Blockchain
2+
import Utils
3+
4+
public enum NetworkEvents {
5+
public struct PeerAdded: Event {
6+
public let info: PeerInfo
7+
}
8+
9+
public struct PeerUpdated: Event {
10+
public let info: PeerInfo
11+
public let newBlockHeader: HeaderRef
12+
}
13+
14+
public struct BulkSyncCompleted: Event {}
15+
}

Node/Sources/Node/NetworkingProtocol/NetworkManager.swift

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@ import Utils
77

88
private let logger = Logger(label: "NetworkManager")
99

10-
enum SendTarget {
10+
let MAX_BLOCKS_PER_REQUEST: UInt32 = 50
11+
12+
enum BroadcastTarget {
1113
case safroleStep1Validator
1214
case currentValidators
1315
}
1416

1517
public final class NetworkManager: Sendable {
1618
private let peerManager: PeerManager
1719
private let network: Network
20+
private let syncManager: SyncManager
1821
private let blockchain: Blockchain
1922
private let subscriptions: EventSubscriptions
2023

@@ -23,16 +26,19 @@ public final class NetworkManager: Sendable {
2326
private let devPeers: Set<NetAddr>
2427

2528
public init(config: Network.Config, blockchain: Blockchain, eventBus: EventBus, devPeers: Set<NetAddr>) async throws {
26-
peerManager = PeerManager()
29+
peerManager = PeerManager(eventBus: eventBus)
2730

28-
let handler = HandlerImpl(blockchain: blockchain, peerManager: peerManager)
2931
network = try await Network(
3032
config: config,
3133
protocolConfig: blockchain.config,
3234
genesisHeader: blockchain.dataProvider.genesisBlockHash,
33-
handler: handler
35+
handler: HandlerImpl(blockchain: blockchain, peerManager: peerManager)
36+
)
37+
syncManager = SyncManager(
38+
blockchain: blockchain, network: network, peerManager: peerManager, eventBus: eventBus
3439
)
3540
self.blockchain = blockchain
41+
3642
subscriptions = EventSubscriptions(eventBus: eventBus)
3743

3844
self.devPeers = devPeers
@@ -53,7 +59,7 @@ public final class NetworkManager: Sendable {
5359
}
5460
}
5561

56-
private func getSendTarget(target: SendTarget) -> Set<NetAddr> {
62+
private func getAddresses(target: BroadcastTarget) -> Set<NetAddr> {
5763
// TODO: get target from onchain state
5864
switch target {
5965
case .safroleStep1Validator:
@@ -65,31 +71,29 @@ public final class NetworkManager: Sendable {
6571
}
6672
}
6773

68-
private func send<R: Decodable>(
74+
private func send(to: NetAddr, message: CERequest) async throws -> Data {
75+
try await network.send(to: to, message: message)
76+
}
77+
78+
private func broadcast(
79+
to: BroadcastTarget,
6980
message: CERequest,
70-
target: SendTarget,
71-
responseType: R.Type,
72-
responseHandler: @Sendable @escaping (Result<R, Error>) async -> Void
81+
responseHandler: @Sendable @escaping (Result<Data, Error>) async -> Void
7382
) async {
74-
let targets = getSendTarget(target: target)
83+
let targets = getAddresses(target: to)
7584
for target in targets {
7685
Task {
7786
logger.trace("sending message", metadata: ["target": "\(target)", "message": "\(message)"])
7887
let res = await Result {
7988
try await network.send(to: target, message: message)
8089
}
81-
.flatMap { data in
82-
Result(catching: {
83-
try JamDecoder.decode(responseType, from: data, withConfig: blockchain.config)
84-
})
85-
}
8690
await responseHandler(res)
8791
}
8892
}
8993
}
9094

91-
private func send(message: CERequest, target: SendTarget) async {
92-
let targets = getSendTarget(target: target)
95+
private func broadcast(to: BroadcastTarget, message: CERequest) async {
96+
let targets = getAddresses(target: to)
9397
for target in targets {
9498
Task {
9599
logger.trace("sending message", metadata: ["target": "\(target)", "message": "\(message)"])
@@ -103,13 +107,13 @@ public final class NetworkManager: Sendable {
103107
private func on(safroleTicketsGenerated event: RuntimeEvents.SafroleTicketsGenerated) async {
104108
logger.trace("sending tickets", metadata: ["epochIndex": "\(event.epochIndex)"])
105109
for ticket in event.items {
106-
await send(
110+
await broadcast(
111+
to: .safroleStep1Validator,
107112
message: .safroleTicket1(.init(
108113
epochIndex: event.epochIndex,
109114
attempt: ticket.ticket.attempt,
110115
proof: ticket.ticket.signature
111-
)),
112-
target: .safroleStep1Validator
116+
))
113117
)
114118
}
115119
}
@@ -125,6 +129,40 @@ struct HandlerImpl: NetworkProtocolHandler {
125129

126130
func handle(ceRequest: CERequest) async throws -> (any Encodable)? {
127131
switch ceRequest {
132+
case let .blockRequest(message):
133+
let dataProvider = blockchain.dataProvider
134+
let count = min(MAX_BLOCKS_PER_REQUEST, message.maxBlocks)
135+
var resp = [BlockRef]()
136+
resp.reserveCapacity(Int(count))
137+
switch message.direction {
138+
case .ascendingExcludsive:
139+
let number = try await dataProvider.getBlockNumber(hash: message.hash)
140+
var currentHash = message.hash
141+
for i in 1 ... count {
142+
let hashes = try await dataProvider.getBlockHash(byNumber: number + i)
143+
var found = false
144+
for hash in hashes {
145+
let block = try await dataProvider.getBlock(hash: hash)
146+
if block.header.parentHash == currentHash {
147+
resp.append(block)
148+
found = true
149+
currentHash = hash
150+
break
151+
}
152+
}
153+
if !found {
154+
break
155+
}
156+
}
157+
case .descendingInclusive:
158+
var hash = message.hash
159+
for _ in 0 ..< count {
160+
let block = try await dataProvider.getBlock(hash: hash)
161+
resp.append(block)
162+
hash = block.header.parentHash
163+
}
164+
}
165+
return resp
128166
case let .safroleTicket1(message):
129167
blockchain.publish(event: RuntimeEvents.SafroleTicketsReceived(
130168
items: [

Node/Sources/Node/NetworkingProtocol/PeerManager.swift

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,28 @@ import Utils
66

77
private let logger = Logger(label: "PeerManager")
88

9-
public struct PeerInfo {
9+
public struct PeerInfo: Sendable {
1010
public let address: NetAddr
1111
public internal(set) var finalized: HashAndSlot
1212
public internal(set) var heads: Set<HashAndSlot> = []
13+
14+
public var best: HashAndSlot? {
15+
heads.max { $0.timeslot < $1.timeslot }
16+
}
1317
}
1418

19+
// TODOs:
20+
// - distinguish between connect peers and offline peers
21+
// - peer reputation
22+
// - purge offline peers
1523
public actor PeerManager: Sendable {
16-
private var peers: [NetAddr: PeerInfo] = [:]
24+
private let eventBus: EventBus
25+
26+
public private(set) var peers: [NetAddr: PeerInfo] = [:]
1727

18-
init() {}
28+
init(eventBus: EventBus) {
29+
self.eventBus = eventBus
30+
}
1931

2032
func addPeer(address: NetAddr, handshake: BlockAnnouncementHandshake) {
2133
var peer = PeerInfo(
@@ -28,35 +40,35 @@ public actor PeerManager: Sendable {
2840
peers[address] = peer
2941

3042
logger.debug("added peer", metadata: ["address": "\(address)", "finalized": "\(peer.finalized)"])
43+
eventBus.publish(NetworkEvents.PeerAdded(info: peer))
3144
}
3245

3346
func updatePeer(address: NetAddr, message: BlockAnnouncement) {
47+
let updatedPeer: PeerInfo
3448
if var peer = peers[address] {
3549
peer.finalized = message.finalized
3650
// purge heads that are older than the finalized head
3751
// or if it is the parent of the new block
3852
// this means if some blocks are skipped, it is possible that we miss purge some heads
3953
// that is ancestor of the new block. but that's fine
4054
peer.heads = peer.heads.filter { head in
41-
head.timeslot > message.finalized.timeslot && head.hash != message.header.parentHash
55+
head.timeslot > message.finalized.timeslot && head.hash != message.header.value.parentHash
4256
}
43-
peer.heads.insert(HashAndSlot(hash: message.header.hash(), timeslot: message.header.timeslot))
44-
peers[address] = peer
57+
peer.heads.insert(HashAndSlot(hash: message.header.hash, timeslot: message.header.value.timeslot))
58+
updatedPeer = peer
4559
} else {
4660
// this shouldn't happen but let's handle it
47-
peers[address] = PeerInfo(
61+
updatedPeer = PeerInfo(
4862
address: address,
4963
finalized: message.finalized,
5064
heads: [
51-
HashAndSlot(hash: message.header.hash(), timeslot: message.header.timeslot),
65+
HashAndSlot(hash: message.header.hash, timeslot: message.header.value.timeslot),
5266
]
5367
)
5468
}
69+
peers[address] = updatedPeer
5570

56-
logger.debug("updated peer", metadata: ["address": "\(address)", "finalized": "\(peers[address]!.finalized)"])
57-
}
58-
59-
public func getPeer(address: NetAddr) -> PeerInfo? {
60-
peers[address]
71+
logger.debug("updated peer", metadata: ["address": "\(address)", "finalized": "\(updatedPeer.finalized)"])
72+
eventBus.publish(NetworkEvents.PeerUpdated(info: updatedPeer, newBlockHeader: message.header))
6173
}
6274
}

0 commit comments

Comments
 (0)