Skip to content

Commit 6e61eb5

Browse files
committed
refactor(core): migrate SSE streaming to coroutines
Replace RxJava-based SSE streaming with kotlinx.coroutines Flow and ProducerScope for improved async handling and code simplification. Also remove deprecated AsyncFileEditorProvider usage. Bump mppVersion to 0.3.6.
1 parent b8240a7 commit 6e61eb5

File tree

5 files changed

+31
-51
lines changed

5 files changed

+31
-51
lines changed

core/src/main/kotlin/cc/unitmesh/devti/llms/custom/CustomSSEProcessor.kt

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@ import com.intellij.openapi.components.service
1515
import com.intellij.openapi.diagnostic.logger
1616
import com.intellij.openapi.project.Project
1717
import com.jayway.jsonpath.JsonPath
18-
import io.reactivex.rxjava3.core.BackpressureStrategy
19-
import io.reactivex.rxjava3.core.Flowable
20-
import io.reactivex.rxjava3.core.FlowableEmitter
2118
import kotlinx.coroutines.Dispatchers
19+
import kotlinx.coroutines.channels.ProducerScope
2220
import kotlinx.coroutines.channels.awaitClose
2321
import kotlinx.coroutines.flow.Flow
2422
import kotlinx.coroutines.flow.callbackFlow
@@ -71,26 +69,22 @@ open class CustomSSEProcessor(private val project: Project) {
7169

7270
@OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
7371
fun streamSSE(call: Call, promptText: String, keepHistory: Boolean = false, messages: MutableList<Message>): Flow<String> {
74-
var emit: FlowableEmitter<SSE>? = null
75-
val sseFlowable = Flowable
76-
.create({ emitter: FlowableEmitter<SSE> ->
77-
emit = emitter.apply { call.enqueue(ResponseBodyCallback(emitter, true)) }
78-
}, BackpressureStrategy.BUFFER)
72+
var producerScope: ProducerScope<SSE>? = null
73+
val sseFlow = callbackFlow {
74+
producerScope = this
75+
call.enqueue(ResponseBodyCallback(this, true))
76+
awaitClose { }
77+
}
7978

8079
try {
8180
var output = ""
8281
var reasonerOutput = ""
8382
return CustomFlowWrapper(callbackFlow {
8483
withContext(Dispatchers.IO) {
85-
sseFlowable
86-
.doOnError {
87-
it.printStackTrace()
88-
trySend(it.message ?: "Error occurs")
89-
close()
90-
}
91-
.blockingForEach { sse ->
84+
try {
85+
sseFlow.collect { sse ->
9286
if (sse.data == "[DONE]") {
93-
return@blockingForEach
87+
return@collect
9488
}
9589

9690
if (responseFormat.isNotEmpty()) {
@@ -148,6 +142,11 @@ open class CustomSSEProcessor(private val project: Project) {
148142
}
149143
}
150144
}
145+
} catch (e: Exception) {
146+
e.printStackTrace()
147+
trySend(e.message ?: "Error occurs")
148+
close()
149+
}
151150

152151
// when stream finished, check if any response parsed succeeded
153152
// if not, notice user check response format
@@ -176,7 +175,7 @@ open class CustomSSEProcessor(private val project: Project) {
176175
close()
177176
}
178177
awaitClose()
179-
}).also { it.cancelCallback { emit?.onComplete() } }
178+
}).also { it.cancelCallback { producerScope?.close() } }
180179
} catch (e: Exception) {
181180
if (hasSuccessRequest) {
182181
logger.info("Failed to stream", e)

core/src/main/kotlin/cc/unitmesh/devti/llms/custom/ResponseBodyCallback.kt

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
package cc.unitmesh.devti.llms.custom
2323

2424
import com.intellij.openapi.diagnostic.logger
25-
import io.reactivex.rxjava3.core.FlowableEmitter
25+
import kotlinx.coroutines.channels.ProducerScope
2626
import okhttp3.Call
2727
import okhttp3.Callback
2828
import okhttp3.Response
@@ -39,10 +39,9 @@ class AutoDevHttpException(error: String, private val statusCode: Int) : Runtime
3939

4040
/**
4141
* Callback to parse Server Sent Events (SSE) from raw InputStream and
42-
* emit the events with io.reactivex.FlowableEmitter to allow streaming of
43-
* SSE.
42+
* emit the events with ProducerScope to allow streaming of SSE.
4443
*/
45-
class ResponseBodyCallback(private val emitter: FlowableEmitter<SSE>, private val emitDone: Boolean) : Callback {
44+
class ResponseBodyCallback(private val emitter: ProducerScope<SSE>, private val emitDone: Boolean) : Callback {
4645
val logger = logger<ResponseBodyCallback>()
4746

4847
override fun onResponse(call: Call, response: Response) {
@@ -59,7 +58,7 @@ class ResponseBodyCallback(private val emitter: FlowableEmitter<SSE>, private va
5958
reader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))
6059
var line: String? = null
6160
var sse: SSE? = null
62-
while (!emitter.isCancelled && reader.readLine().also { line = it } != null) {
61+
while (!emitter.isClosedForSend && reader.readLine().also { line = it } != null) {
6362
sse = when {
6463
line!!.startsWith("data:") -> {
6564
val data = line!!.substring(5).trim { it <= ' ' }
@@ -69,11 +68,11 @@ class ResponseBodyCallback(private val emitter: FlowableEmitter<SSE>, private va
6968
line == "" && sse != null -> {
7069
if (sse.isDone) {
7170
if (emitDone) {
72-
emitter.onNext(sse)
71+
emitter.trySend(sse)
7372
}
7473
break
7574
}
76-
emitter.onNext(sse)
75+
emitter.trySend(sse)
7776
null
7877
}
7978
// starts with event:
@@ -82,8 +81,8 @@ class ResponseBodyCallback(private val emitter: FlowableEmitter<SSE>, private va
8281
val eventName = line!!.substring(6).trim { it <= ' ' }
8382
if (eventName == "ping") {
8483
// skip ping event and data
85-
emitter.onNext(sse ?: SSE(""))
86-
emitter.onNext(sse ?: SSE(""))
84+
emitter.trySend(sse ?: SSE(""))
85+
emitter.trySend(sse ?: SSE(""))
8786
}
8887

8988
null
@@ -108,8 +107,8 @@ class ResponseBodyCallback(private val emitter: FlowableEmitter<SSE>, private va
108107
}
109108

110109
line.startsWith("{") && line.endsWith("}") -> {
111-
emitter.onNext(SSE(line))
112-
emitter.onComplete()
110+
emitter.trySend(SSE(line))
111+
emitter.close()
113112
return
114113
}
115114

@@ -121,7 +120,7 @@ class ResponseBodyCallback(private val emitter: FlowableEmitter<SSE>, private va
121120
}
122121
}
123122

124-
emitter.onComplete()
123+
emitter.close()
125124
} catch (t: Throwable) {
126125
logger<ResponseBodyCallback>().error("Error while reading SSE", t)
127126
logger<ResponseBodyCallback>().error("Request: ${call.request()}")
@@ -138,6 +137,6 @@ class ResponseBodyCallback(private val emitter: FlowableEmitter<SSE>, private va
138137
}
139138

140139
override fun onFailure(call: Call, e: IOException) {
141-
emitter.onError(e)
140+
emitter.close(e)
142141
}
143142
}

core/src/main/kotlin/cc/unitmesh/devti/mcp/editor/McpPreviewEditorProvider.kt

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,18 @@
11
package cc.unitmesh.devti.mcp.editor
22

3-
import com.intellij.openapi.fileEditor.AsyncFileEditorProvider
43
import com.intellij.openapi.fileEditor.FileEditor
54
import com.intellij.openapi.fileEditor.FileEditorPolicy
65
import com.intellij.openapi.fileEditor.WeighedFileEditorProvider
76
import com.intellij.openapi.project.Project
87
import com.intellij.openapi.vfs.VirtualFile
98

10-
class McpPreviewEditorProvider : WeighedFileEditorProvider(), AsyncFileEditorProvider {
9+
class McpPreviewEditorProvider : WeighedFileEditorProvider() {
1110
override fun accept(project: Project, file: VirtualFile) = file.name.contains(".mcp.json")
1211

1312
override fun createEditor(project: Project, virtualFile: VirtualFile): FileEditor {
1413
return McpPreviewEditor(project, virtualFile)
1514
}
1615

17-
override fun createEditorAsync(project: Project, file: VirtualFile): AsyncFileEditorProvider.Builder {
18-
return object : AsyncFileEditorProvider.Builder() {
19-
override fun build(): FileEditor {
20-
return McpPreviewEditor(project, file)
21-
}
22-
}
23-
}
24-
2516
override fun getEditorTypeId(): String = "mcp-preview-editor"
2617

2718
override fun getPolicy(): FileEditorPolicy = FileEditorPolicy.PLACE_AFTER_DEFAULT_EDITOR

exts/devins-lang/src/main/kotlin/cc/unitmesh/devti/language/debugger/editor/ShirePreviewEditorProvider.kt

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
package cc.unitmesh.devti.language.debugger.editor
22

33
import cc.unitmesh.devti.language.DevInFileType
4-
import com.intellij.openapi.fileEditor.AsyncFileEditorProvider
54
import com.intellij.openapi.fileEditor.FileEditor
65
import com.intellij.openapi.fileEditor.FileEditorPolicy
76
import com.intellij.openapi.fileEditor.WeighedFileEditorProvider
87
import com.intellij.openapi.fileTypes.FileTypeRegistry
98
import com.intellij.openapi.project.Project
109
import com.intellij.openapi.vfs.VirtualFile
1110

12-
class ShirePreviewEditorProvider : WeighedFileEditorProvider(), AsyncFileEditorProvider {
11+
class ShirePreviewEditorProvider : WeighedFileEditorProvider() {
1312
override fun accept(project: Project, file: VirtualFile): Boolean {
1413
return FileTypeRegistry.getInstance().isFileOfType(file, DevInFileType.INSTANCE)
1514
}
@@ -18,14 +17,6 @@ class ShirePreviewEditorProvider : WeighedFileEditorProvider(), AsyncFileEditorP
1817
return ShirePreviewEditor(project, virtualFile)
1918
}
2019

21-
override fun createEditorAsync(project: Project, file: VirtualFile): AsyncFileEditorProvider.Builder {
22-
return object : AsyncFileEditorProvider.Builder() {
23-
override fun build(): FileEditor {
24-
return ShirePreviewEditor(project, file)
25-
}
26-
}
27-
}
28-
2920
override fun getEditorTypeId(): String = "shire-preview-editor"
3021

3122
override fun getPolicy(): FileEditorPolicy = FileEditorPolicy.PLACE_AFTER_DEFAULT_EDITOR

mpp-idea/gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ org.gradle.caching = true
1414
kotlin.stdlib.default.dependency = false
1515

1616
# MPP Version
17-
mppVersion = 0.3.5
17+
mppVersion = 0.3.6
1818

0 commit comments

Comments
 (0)