Skip to content

Commit 63c6167

Browse files
authored
Multinode (#190)
* minor fix * fix connection * broadcase safrole tickets * update logging * fix unaligned load * trace * fix tests
1 parent 86e32b5 commit 63c6167

File tree

20 files changed

+254
-69
lines changed

20 files changed

+254
-69
lines changed

Blockchain/Sources/Blockchain/RuntimeProtocols/RuntimeEvents.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public enum RuntimeEvents {
1313

1414
// New safrole ticket generated from SafroleService
1515
public struct SafroleTicketsGenerated: Event {
16+
public let epochIndex: EpochIndex
1617
public let items: [TicketItemAndOutput]
1718
public let publicKey: Bandersnatch.PublicKey
1819
}

Blockchain/Sources/Blockchain/Validator/SafroleService.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public final class SafroleService: ServiceBase, @unchecked Sendable {
7777
)
7878

7979
events.append(.init(
80+
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
8081
items: tickets,
8182
publicKey: secret.publicKey
8283
))

Blockchain/Sources/Blockchain/Validator/ServiceBase.swift

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,43 +5,27 @@ public class ServiceBase {
55
public let id: UniqueId
66
let logger: Logger
77
public let config: ProtocolConfigRef
8-
private let eventBus: EventBus
9-
private let subscriptionTokens: ThreadSafeContainer<[EventBus.SubscriptionToken]> = .init([])
8+
private let subscriptions: EventSubscriptions
109

1110
init(id: UniqueId, config: ProtocolConfigRef, eventBus: EventBus) {
1211
self.id = id
1312
logger = Logger(label: id)
1413
self.config = config
15-
self.eventBus = eventBus
16-
}
17-
18-
deinit {
19-
let eventBus = self.eventBus
20-
let subscriptionTokens = self.subscriptionTokens
21-
Task {
22-
for token in subscriptionTokens.value {
23-
await eventBus.unsubscribe(token: token)
24-
}
25-
}
14+
subscriptions = EventSubscriptions(eventBus: eventBus)
2615
}
2716

2817
@discardableResult
29-
func subscribe<T: Event>(_ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void) async -> EventBus
30-
.SubscriptionToken
31-
{
32-
let token = await eventBus.subscribe(eventType, handler: handler)
33-
subscriptionTokens.write { $0.append(token) }
34-
return token
18+
func subscribe<T: Event>(
19+
_ eventType: T.Type, id _: UniqueId, handler: @escaping @Sendable (T) async throws -> Void
20+
) async -> EventBus.SubscriptionToken {
21+
await subscriptions.subscribe(eventType, id: id, handler: handler)
3522
}
3623

3724
func unsubscribe(token: EventBus.SubscriptionToken) async {
38-
subscriptionTokens.write { tokens in
39-
tokens.removeAll { $0 == token }
40-
}
41-
await eventBus.unsubscribe(token: token)
25+
await subscriptions.unsubscribe(token: token)
4226
}
4327

4428
func publish(_ event: some Event) {
45-
eventBus.publish(event)
29+
subscriptions.publish(event)
4630
}
4731
}

Blockchain/Tests/BlockchainTests/ExtrinsicPoolServiceTests.swift

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ struct ExtrinsicPoolServiceTests {
5656

5757
allTickets.append(contentsOf: tickets)
5858

59-
let event = RuntimeEvents.SafroleTicketsGenerated(items: tickets, publicKey: secretKey.publicKey)
59+
let event = RuntimeEvents.SafroleTicketsGenerated(
60+
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
61+
items: tickets,
62+
publicKey: secretKey.publicKey
63+
)
6064
await eventBus.publish(event)
6165

6266
// Wait for the event to be processed
@@ -126,7 +130,11 @@ struct ExtrinsicPoolServiceTests {
126130
idx: 0
127131
)
128132

129-
let addEvent = RuntimeEvents.SafroleTicketsGenerated(items: tickets, publicKey: secretKey.publicKey)
133+
let addEvent = RuntimeEvents.SafroleTicketsGenerated(
134+
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
135+
items: tickets,
136+
publicKey: secretKey.publicKey
137+
)
130138
await eventBus.publish(addEvent)
131139

132140
// Wait for the event to be processed
@@ -173,7 +181,11 @@ struct ExtrinsicPoolServiceTests {
173181
idx: 0
174182
)
175183

176-
let addEvent = RuntimeEvents.SafroleTicketsGenerated(items: oldTickets, publicKey: secretKey.publicKey)
184+
let addEvent = RuntimeEvents.SafroleTicketsGenerated(
185+
epochIndex: state.value.timeslot.timeslotToEpochIndex(config: config),
186+
items: oldTickets,
187+
publicKey: secretKey.publicKey
188+
)
177189
await eventBus.publish(addEvent)
178190
await storeMiddleware.wait()
179191

@@ -216,7 +228,11 @@ struct ExtrinsicPoolServiceTests {
216228
)
217229

218230
// Ensure new tickets are accepted
219-
let newAddEvent = RuntimeEvents.SafroleTicketsGenerated(items: newTickets, publicKey: secretKey.publicKey)
231+
let newAddEvent = RuntimeEvents.SafroleTicketsGenerated(
232+
epochIndex: newState.value.timeslot.timeslotToEpochIndex(config: config),
233+
items: newTickets,
234+
publicKey: secretKey.publicKey
235+
)
220236
await eventBus.publish(newAddEvent)
221237
await storeMiddleware.wait()
222238

Boka/.swiftpm/xcode/xcshareddata/xcschemes/Boka.xcscheme

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@
6767
argument = "--validator"
6868
isEnabled = "YES">
6969
</CommandLineArgument>
70+
<CommandLineArgument
71+
argument = "--chain=minimal"
72+
isEnabled = "YES">
73+
</CommandLineArgument>
7074
</CommandLineArguments>
7175
<EnvironmentVariables>
7276
<EnvironmentVariable

Boka/Package.resolved

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Boka/Sources/Boka.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ enum MaybeEnabled<T: ExpressibleByArgument>: ExpressibleByArgument {
2727
case disabled
2828

2929
init?(argument: String) {
30-
if argument.lowercased() == "false" {
30+
if argument.lowercased() == "no" {
3131
self = .disabled
3232
} else {
3333
guard let argument = T(argument: argument) else {
@@ -60,11 +60,11 @@ struct Boka: AsyncParsableCommand {
6060
@Option(name: .long, help: "A preset config or path to chain config file.")
6161
var chain: Genesis = .preset(.dev)
6262

63-
@Option(name: .long, help: "Listen address for RPC server. Pass 'false' to disable RPC server. Default to 127.0.0.1:9955.")
63+
@Option(name: .long, help: "Listen address for RPC server. Pass 'no' to disable RPC server. Default to 127.0.0.1:9955.")
6464
var rpc: MaybeEnabled<NetAddr> = .enabled(NetAddr(address: "127.0.0.1:9955")!)
6565

6666
@Option(name: .long, help: "Listen address for P2P protocol.")
67-
var p2p: NetAddr = .init(address: "127.0.0.1:19955")!
67+
var p2p: NetAddr = .init(address: "127.0.0.1:0")!
6868

6969
@Option(name: .long, help: "Specify peer P2P addresses.")
7070
var peers: [NetAddr] = []
@@ -114,12 +114,11 @@ struct Boka: AsyncParsableCommand {
114114
}
115115

116116
let rpcConfig = rpc.asOptional.map { addr -> RPCConfig in
117-
logger.info("RPC listen address: \(addr)")
118117
let (address, port) = addr.getAddressAndPort()
119118
return RPCConfig(listenAddress: address, port: Int(port))
120119
}
121120

122-
let keystore = try await DevKeyStore()
121+
let keystore = try await DevKeyStore(devKeysCount: devSeed == nil ? 12 : 0)
123122

124123
let networkKey: Ed25519.SecretKey = try await {
125124
if let devSeed {
@@ -130,6 +129,7 @@ struct Boka: AsyncParsableCommand {
130129
}
131130
}()
132131

132+
logger.info("Network key: \(networkKey.publicKey.data.toHexString())")
133133
let networkConfig = NetworkConfig(
134134
mode: validator ? .validator : .builder,
135135
listenAddress: p2p,
@@ -144,7 +144,7 @@ struct Boka: AsyncParsableCommand {
144144
handlerMiddleware: .tracing(prefix: "Handler")
145145
)
146146

147-
let config = Node.Config(rpc: rpcConfig, network: networkConfig)
147+
let config = Node.Config(rpc: rpcConfig, network: networkConfig, peers: peers)
148148

149149
let node: Node = if validator {
150150
try await ValidatorNode(

Networking/Sources/MsQuicSwift/NetAddr.swift

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import msquic
88
import Darwin
99
#endif
1010

11-
public struct NetAddr: Sendable {
11+
public struct NetAddr: Sendable, Equatable, Hashable {
1212
var quicAddr: QUIC_ADDR
1313

1414
public init?(address: String) {
@@ -38,17 +38,13 @@ public struct NetAddr: Sendable {
3838
let (host, port, _) = parseQuicAddr(quicAddr) ?? ("::dead:beef", 0, false)
3939
return (host, port)
4040
}
41-
}
4241

43-
extension NetAddr: Equatable {
4442
public static func == (lhs: NetAddr, rhs: NetAddr) -> Bool {
4543
var addr1 = lhs.quicAddr
4644
var addr2 = rhs.quicAddr
4745
return QuicAddrCompare(&addr1, &addr2) == 1
4846
}
49-
}
5047

51-
extension NetAddr: Hashable {
5248
public func hash(into hasher: inout Hasher) {
5349
var addr = quicAddr
5450
let hash = QuicAddrHash(&addr)

Networking/Sources/MsQuicSwift/QuicConnection.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,9 @@ private class ConnectionHandle {
250250

251251
case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE:
252252
logger.trace("Shutdown complete")
253+
if let connection {
254+
connection.handler.shutdownComplete(connection)
255+
}
253256
if event.pointee.SHUTDOWN_COMPLETE.AppCloseInProgress == 0 {
254257
// avoid closing twice
255258
api.call { api in

Networking/Sources/MsQuicSwift/QuicEventHandler.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public protocol QuicEventHandler: Sendable {
3333
func shouldOpen(_ connection: QuicConnection, certificate: Data?) -> QuicStatus
3434
func connected(_ connection: QuicConnection)
3535
func shutdownInitiated(_ connection: QuicConnection, reason: ConnectionCloseReason)
36+
func shutdownComplete(_ connection: QuicConnection)
3637
func streamStarted(_ connect: QuicConnection, stream: QuicStream)
3738

3839
// stream events
@@ -55,6 +56,7 @@ extension QuicEventHandler {
5556
public func connected(_: QuicConnection) {}
5657

5758
public func shutdownInitiated(_: QuicConnection, reason _: ConnectionCloseReason) {}
59+
public func shutdownComplete(_: QuicConnection) {}
5860

5961
public func streamStarted(_: QuicConnection, stream _: QuicStream) {}
6062

@@ -69,6 +71,7 @@ public final class MockQuicEventHandler: QuicEventHandler {
6971
case shouldOpen(connection: QuicConnection, certificate: Data?)
7072
case connected(connection: QuicConnection)
7173
case shutdownInitiated(connection: QuicConnection, reason: ConnectionCloseReason)
74+
case shutdownComplete(connection: QuicConnection)
7275
case streamStarted(connection: QuicConnection, stream: QuicStream)
7376
case dataReceived(stream: QuicStream, data: Data)
7477
case closed(stream: QuicStream, status: QuicStatus, code: QuicErrorCode)
@@ -107,6 +110,12 @@ public final class MockQuicEventHandler: QuicEventHandler {
107110
}
108111
}
109112

113+
public func shutdownComplete(_ connection: QuicConnection) {
114+
events.write { events in
115+
events.append(.shutdownComplete(connection: connection))
116+
}
117+
}
118+
110119
public func streamStarted(_ connect: QuicConnection, stream: QuicStream) {
111120
events.write { events in
112121
events.append(.streamStarted(connection: connect, stream: stream))

0 commit comments

Comments
 (0)