Skip to content

Commit 7a5877d

Browse files
committed
update peer test
1 parent d2c2e63 commit 7a5877d

File tree

4 files changed

+34
-34
lines changed

4 files changed

+34
-34
lines changed

Networking/Sources/MsQuicSwift/QuicStream.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,18 @@ private class StreamHandle {
198198
logger.warning("Stream received data but it is already gone?")
199199
}
200200
}
201+
if event.pointee.RECEIVE.Flags.rawValue & QUIC_RECEIVE_FLAG_FIN.rawValue != 0 {
202+
// maybe close function need it
203+
if let stream {
204+
stream.handler.dataReceived(stream, data: Data())
205+
}
206+
}
201207

202208
case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN:
203209
logger.trace("Peer send shutdown")
210+
api.call { api in
211+
_ = api.pointee.StreamShutdown(ptr, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0)
212+
}
204213

205214
case QUIC_STREAM_EVENT_PEER_SEND_ABORTED:
206215
logger.trace("Peer send aborted")

Networking/Sources/Networking/Connection.swift

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public protocol ConnectionInfoProtocol {
1414

1515
enum ConnectionError: Error {
1616
case receiveFailed
17+
case invalidLength
1718
}
1819

1920
public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoProtocol {
@@ -42,26 +43,30 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
4243
try? connection.shutdown(errorCode: abort ? 1 : 0) // TODO: define some error code
4344
}
4445

45-
public func request(_ request: Handler.EphemeralHandler.Request) async throws -> Data {
46+
public func decodeLength(from data: Data) throws -> UInt32 {
47+
guard data.count >= 4 else {
48+
throw ConnectionError.invalidLength
49+
}
50+
return UInt32(littleEndian: data.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
51+
}
52+
53+
public func request(_ request: Handler.EphemeralHandler.Request) async throws -> [Data] {
4654
let data = try request.encode()
4755
let kind = request.kind
4856
let stream = try createStream(kind: kind)
4957
try stream.send(message: data)
50-
// TODO: pipe this to decoder directly to be able to reject early
51-
var response = Data()
58+
59+
var reps = [Data()]
5260
while let nextData = await stream.receive() {
53-
response.append(nextData)
54-
break
55-
}
56-
guard response.count >= 4 else {
57-
stream.close(abort: true)
58-
throw ConnectionError.receiveFailed
61+
if nextData.isEmpty { // fin flag
62+
break
63+
}
64+
let length = try decodeLength(from: nextData.prefix(4))
65+
let data = nextData.dropFirst(4).prefix(Int(length))
66+
reps.append(data)
5967
}
60-
let lengthData = response.prefix(4)
61-
let length = UInt32(
62-
littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }
63-
)
64-
return response.dropFirst(4).prefix(Int(length))
68+
69+
return reps
6570
}
6671

6772
@discardableResult
@@ -121,7 +126,6 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
121126
return
122127
}
123128
if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) {
124-
// TODO: handle duplicated UP streams
125129
presistentStreams.write { presistentStreams in
126130
presistentStreams[upKind] = stream
127131
}
@@ -134,25 +138,20 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
134138
var decoder = impl.ephemeralStreamHandler.createDecoder(kind: ceKind)
135139

136140
let lengthData = await stream.receive(count: 4)
137-
guard let lengthData else {
141+
guard let lengthData, let length = try? decodeLength(from: lengthData) else {
138142
stream.close(abort: true)
139143
logger.debug("Invalid request length")
140144
return
141145
}
142-
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
143-
// sanity check for length
144-
// TODO: pick better value
145146
guard length < 1024 * 1024 * 10 else {
146147
stream.close(abort: true)
147148
logger.debug("Invalid request length: \(length)")
148-
// TODO: report bad peer
149149
return
150150
}
151151
let data = await stream.receive(count: Int(length))
152152
guard let data else {
153153
stream.close(abort: true)
154154
logger.debug("Invalid request data")
155-
// TODO: report bad peer
156155
return
157156
}
158157
let request = try decoder.decode(data: data)
@@ -196,16 +195,12 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
196195
do {
197196
while true {
198197
let lengthData = await stream.receive(count: 4)
199-
guard let lengthData else {
198+
guard let lengthData, let length = try? connection.decodeLength(from: lengthData) else {
200199
break
201200
}
202-
let length = UInt32(littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) })
203-
// sanity check for length
204-
// TODO: pick better value
205201
guard length < 1024 * 1024 * 10 else {
206202
stream.close(abort: true)
207203
logger.debug("Invalid message length: \(length)")
208-
// TODO: report bad peer
209204
return
210205
}
211206
let data = await stream.receive(count: Int(length))

Networking/Sources/Networking/Stream.swift

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,6 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
8585
}
8686

8787
func received(data: Data) {
88-
if data.isEmpty {
89-
return
90-
}
91-
9288
if !channel.syncSend(data) {
9389
logger.warning("stream \(id) is full")
9490
// TODO: backpressure handling

Networking/Tests/NetworkingTests/PeerTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,19 +207,19 @@ struct PeerTests {
207207
)
208208
try? await Task.sleep(for: .milliseconds(100))
209209

210-
let data1 = try await connection1.request(
210+
let dataList1 = try await connection1.request(
211211
MockRequest(kind: .typeA, data: Data("hello world".utf8))
212212
)
213-
#expect(data1 == Data("hello world".utf8))
213+
#expect(dataList1.first == Data("hello world".utf8))
214214

215215
let connection2 = try peer2.connect(
216216
to: NetAddr(ipAddress: "127.0.0.1", port: 8083)!, role: .validator
217217
)
218218
try? await Task.sleep(for: .milliseconds(100))
219219

220-
let data2 = try await connection2.request(
220+
let dataList2 = try await connection2.request(
221221
MockRequest(kind: .typeB, data: Data("I am jam".utf8))
222222
)
223-
#expect(data2 == Data("I am jam".utf8))
223+
#expect(dataList2.first == Data("I am jam".utf8))
224224
}
225225
}

0 commit comments

Comments
 (0)