Skip to content

Commit 1367621

Browse files
authored
Add NetworkTransport (#2)
1 parent 224707c commit 1367621

File tree

1 file changed

+230
-0
lines changed

1 file changed

+230
-0
lines changed

Sources/MCP/Base/Transports.swift

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,233 @@ public actor StdioTransport: Transport {
170170
}
171171
}
172172
}
173+
174+
#if canImport(Network)
175+
import Network
176+
177+
/// Network connection based transport implementation
178+
public actor NetworkTransport: Transport {
179+
private let connection: NWConnection
180+
public nonisolated let logger: Logger
181+
182+
private var isConnected = false
183+
private let messageStream: AsyncStream<String>
184+
private let messageContinuation: AsyncStream<String>.Continuation
185+
186+
// Track connection state for continuations
187+
private var connectionContinuationResumed = false
188+
189+
public init(connection: NWConnection, logger: Logger? = nil) {
190+
self.connection = connection
191+
self.logger =
192+
logger
193+
?? Logger(
194+
label: "mcp.transport.network",
195+
factory: { _ in SwiftLogNoOpLogHandler() }
196+
)
197+
198+
// Create message stream
199+
var continuation: AsyncStream<String>.Continuation!
200+
self.messageStream = AsyncStream { continuation = $0 }
201+
self.messageContinuation = continuation
202+
}
203+
204+
/// Connects to the network transport
205+
public func connect() async throws {
206+
guard !isConnected else { return }
207+
208+
// Reset continuation state
209+
connectionContinuationResumed = false
210+
211+
// Wait for connection to be ready
212+
try await withCheckedThrowingContinuation {
213+
[weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
214+
guard let self = self else {
215+
continuation.resume(throwing: MCP.Error.internalError("Transport deallocated"))
216+
return
217+
}
218+
219+
connection.stateUpdateHandler = { [weak self] state in
220+
guard let self = self else { return }
221+
222+
Task { @MainActor in
223+
switch state {
224+
case .ready:
225+
await self.handleConnectionReady(continuation: continuation)
226+
case .failed(let error):
227+
await self.handleConnectionFailed(
228+
error: error, continuation: continuation)
229+
case .cancelled:
230+
await self.handleConnectionCancelled(continuation: continuation)
231+
default:
232+
// Wait for ready or failed state
233+
break
234+
}
235+
}
236+
}
237+
238+
// Start the connection if it's not already started
239+
if connection.state != .ready {
240+
connection.start(queue: .main)
241+
} else {
242+
Task { @MainActor in
243+
await self.handleConnectionReady(continuation: continuation)
244+
}
245+
}
246+
}
247+
}
248+
249+
private func handleConnectionReady(continuation: CheckedContinuation<Void, Swift.Error>)
250+
async
251+
{
252+
if !connectionContinuationResumed {
253+
connectionContinuationResumed = true
254+
isConnected = true
255+
logger.info("Network transport connected successfully")
256+
continuation.resume()
257+
// Start the receive loop after connection is established
258+
Task { await self.receiveLoop() }
259+
}
260+
}
261+
262+
private func handleConnectionFailed(
263+
error: Swift.Error, continuation: CheckedContinuation<Void, Swift.Error>
264+
) async {
265+
if !connectionContinuationResumed {
266+
connectionContinuationResumed = true
267+
logger.error("Connection failed: \(error)")
268+
continuation.resume(throwing: error)
269+
}
270+
}
271+
272+
private func handleConnectionCancelled(continuation: CheckedContinuation<Void, Swift.Error>)
273+
async
274+
{
275+
if !connectionContinuationResumed {
276+
connectionContinuationResumed = true
277+
logger.warning("Connection cancelled")
278+
continuation.resume(throwing: MCP.Error.internalError("Connection cancelled"))
279+
}
280+
}
281+
282+
public func disconnect() async {
283+
guard isConnected else { return }
284+
isConnected = false
285+
connection.cancel()
286+
messageContinuation.finish()
287+
logger.info("Network transport disconnected")
288+
}
289+
290+
public func send(_ message: String) async throws {
291+
guard isConnected else {
292+
throw MCP.Error.internalError("Transport not connected")
293+
}
294+
295+
guard let data = (message + "\n").data(using: .utf8) else {
296+
throw MCP.Error.internalError("Failed to encode message")
297+
}
298+
299+
// Use a local actor-isolated variable to track continuation state
300+
var sendContinuationResumed = false
301+
302+
try await withCheckedThrowingContinuation {
303+
[weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
304+
guard let self = self else {
305+
continuation.resume(throwing: MCP.Error.internalError("Transport deallocated"))
306+
return
307+
}
308+
309+
connection.send(
310+
content: data,
311+
completion: .contentProcessed { [weak self] error in
312+
guard let self = self else { return }
313+
314+
Task { @MainActor in
315+
if !sendContinuationResumed {
316+
sendContinuationResumed = true
317+
if let error = error {
318+
self.logger.error("Send error: \(error)")
319+
continuation.resume(
320+
throwing: MCP.Error.internalError("Send error: \(error)"))
321+
} else {
322+
continuation.resume()
323+
}
324+
}
325+
}
326+
})
327+
}
328+
}
329+
330+
public func receive() -> AsyncThrowingStream<String, Swift.Error> {
331+
return AsyncThrowingStream { continuation in
332+
Task {
333+
for await message in messageStream {
334+
continuation.yield(message)
335+
}
336+
continuation.finish()
337+
}
338+
}
339+
}
340+
341+
private func receiveLoop() async {
342+
var buffer = Data()
343+
344+
while isConnected && !Task.isCancelled {
345+
do {
346+
let newData = try await receiveData()
347+
buffer.append(newData)
348+
349+
// Process complete messages
350+
while let newlineIndex = buffer.firstIndex(of: UInt8(ascii: "\n")) {
351+
let messageData = buffer[..<newlineIndex]
352+
buffer = buffer[(newlineIndex + 1)...]
353+
354+
if let message = String(data: messageData, encoding: .utf8),
355+
!message.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty
356+
{
357+
logger.debug("Message received", metadata: ["message": "\(message)"])
358+
messageContinuation.yield(message)
359+
}
360+
}
361+
} catch {
362+
if !Task.isCancelled {
363+
logger.error("Receive error: \(error)")
364+
}
365+
break
366+
}
367+
}
368+
369+
messageContinuation.finish()
370+
}
371+
372+
private func receiveData() async throws -> Data {
373+
// Use a local actor-isolated variable to track continuation state
374+
var receiveContinuationResumed = false
375+
376+
return try await withCheckedThrowingContinuation {
377+
[weak self] (continuation: CheckedContinuation<Data, Swift.Error>) in
378+
guard let self = self else {
379+
continuation.resume(throwing: MCP.Error.internalError("Transport deallocated"))
380+
return
381+
}
382+
383+
connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) {
384+
content, _, _, error in
385+
Task { @MainActor in
386+
if !receiveContinuationResumed {
387+
receiveContinuationResumed = true
388+
if let error = error {
389+
continuation.resume(throwing: error)
390+
} else if let content = content {
391+
continuation.resume(returning: content)
392+
} else {
393+
continuation.resume(
394+
throwing: MCP.Error.internalError("No data received"))
395+
}
396+
}
397+
}
398+
}
399+
}
400+
}
401+
}
402+
#endif

0 commit comments

Comments
 (0)