From 67306dbf4265fb588d3ff37231683c67929dcd83 Mon Sep 17 00:00:00 2001 From: Jeremy Schoffen Date: Tue, 25 Nov 2025 00:37:30 +0100 Subject: [PATCH 1/4] Fix: typos, forgotten print statements, dangling bracket --- .../starfederation/datastar/clojure/adapter/common.clj | 2 +- src/bb-example/src/main/bb_example/animation/handlers.clj | 7 ------- src/bb-example/src/main/bb_example/animation/rendering.clj | 2 +- src/dev/examples/animation_gzip.clj | 3 --- src/dev/examples/animation_gzip/rendering.clj | 3 +-- 5 files changed, 3 insertions(+), 14 deletions(-) diff --git a/libraries/sdk/src/main/starfederation/datastar/clojure/adapter/common.clj b/libraries/sdk/src/main/starfederation/datastar/clojure/adapter/common.clj index 0fd6493..d65b05a 100644 --- a/libraries/sdk/src/main/starfederation/datastar/clojure/adapter/common.clj +++ b/libraries/sdk/src/main/starfederation/datastar/clojure/adapter/common.clj @@ -405,7 +405,7 @@ (defn default-on-exception "Default [[on-exception]] callback, it returns `true` on [[IOException]] which - closes, the generator. It rethrows the exception wrapped with `ex-infor` + closes, the generator. It rethrows the exception wrapped with `ex-info` otherwise." [_sse e ctx] (if (instance? IOException e) diff --git a/src/bb-example/src/main/bb_example/animation/handlers.clj b/src/bb-example/src/main/bb_example/animation/handlers.clj index dc5c3c1..bae967e 100644 --- a/src/bb-example/src/main/bb_example/animation/handlers.clj +++ b/src/bb-example/src/main/bb_example/animation/handlers.clj @@ -44,7 +44,6 @@ (defn ping-handler ([req] (when-let [coords (recover-coords req)] - (println "-- ping " coords) (state/add-ping! coords)) {:status 204}) ([req respond _raise] @@ -53,7 +52,6 @@ (defn random-pings-handler ([_req] - (println "-- add pixels") (state/add-random-pings!) {:status 204}) ([req respond _raise] @@ -63,7 +61,6 @@ (defn reset-handler ([_req] - (println "-- reseting state") (state/reset-state!) {:status 204}) ([req respond _raise] @@ -71,7 +68,6 @@ (defn step-handler ([_req] - (println "-- Step 1") (state/step-state!) {:status 204}) ([req respond _raise] @@ -82,7 +78,6 @@ (defn play-handler ([_req] - (println "-- play animation") (state/start-animating!) {:status 204}) ([req respond _raise] @@ -91,7 +86,6 @@ (defn pause-handler ([_req] - (println "-- pause animation") (state/stop-animating!) {:status 204}) ([req respond _raise] @@ -100,7 +94,6 @@ (defn resize-handler ([req] (let [{x "rows" y "columns"} (c/get-signals req)] - (println "-- resize" x y) (state/resize! x y) {:status 204})) ([req respond _raise] diff --git a/src/bb-example/src/main/bb_example/animation/rendering.clj b/src/bb-example/src/main/bb_example/animation/rendering.clj index 5c98a7c..6237f5a 100644 --- a/src/bb-example/src/main/bb_example/animation/rendering.clj +++ b/src/bb-example/src/main/bb_example/animation/rendering.clj @@ -57,7 +57,7 @@ (defn cell-style [v] - (str "background-color: "v";}")) + (str "background-color: "v";")) (def on-click diff --git a/src/dev/examples/animation_gzip.clj b/src/dev/examples/animation_gzip.clj index 8519caf..499be23 100644 --- a/src/dev/examples/animation_gzip.clj +++ b/src/dev/examples/animation_gzip.clj @@ -85,6 +85,3 @@ (u/clear-terminal!) (u/reboot-hk-server! #'handler-http-kit) (u/reboot-jetty-server! #'handler-ring {:async? true})) - - - diff --git a/src/dev/examples/animation_gzip/rendering.clj b/src/dev/examples/animation_gzip/rendering.clj index fb8f03c..0c0c661 100644 --- a/src/dev/examples/animation_gzip/rendering.clj +++ b/src/dev/examples/animation_gzip/rendering.clj @@ -57,9 +57,8 @@ (defn rgb [r g b] (str "rgb(" r ", " g ", " b")")) - (defn cell-style [v] - (str "background-color: "v";}")) + (str "background-color: "v";")) (def on-click From 0b66653aa9b2c2c75de99678f52066f96a78dbd9 Mon Sep 17 00:00:00 2001 From: Jeremy Schoffen Date: Tue, 25 Nov 2025 01:06:50 +0100 Subject: [PATCH 2/4] Refactor: getting rid of the convoluted logic in the jetty implementation The previous implementation used a complicated logic involving locks to protect against an api usage that should not happen. The protection is now removed, the code is much simpler. --- .../datastar/clojure/adapter/ring.clj | 18 +-- .../datastar/clojure/adapter/ring/impl.clj | 52 +++----- .../clojure/adapter/ring/impl_test.clj | 114 +----------------- 3 files changed, 33 insertions(+), 151 deletions(-) diff --git a/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring.clj b/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring.clj index f58de0b..f5e94f2 100644 --- a/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring.clj +++ b/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring.clj @@ -23,12 +23,18 @@ "Returns a ring response that will start a SSE stream. The status code will be either 200 or the user provided one. + Specific SSE headers are set automatically, the user provided ones will be merged. The response body is a sse generator implementing `ring.core.protocols/StreamableResponseBody`. + + The body is a special value that is part of the SSE machinery and should not + be used directly. In other words you must only interact with the SSEGen + provided as argument of the different callbacks described below. + In sync mode, the connection is closed automatically when the handler is - done running. You need to explicitely close it in rinc async. + done running. You need to explicitly close it in rinc async. Opts: - `:status`: status for the HTTP response, defaults to 200 @@ -52,9 +58,7 @@ " [ring-request {:keys [status] :as opts}] {:pre [(ac/on-open opts)]} - (let [sse-gen (impl/->sse-gen)] - {:status (or status 200) - :headers (ac/headers ring-request opts) - :body sse-gen - ::impl/opts opts})) - + {:status (or status 200) + :headers (ac/headers ring-request opts) + :body (impl/->sse-gen) + ::impl/opts opts}) diff --git a/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj b/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj index 5048433..ce5bfc8 100644 --- a/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj +++ b/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj @@ -34,42 +34,22 @@ ^:unsynchronized-mutable on-exception] rp/StreamableResponseBody (write-body-to-stream [this response output-stream] - (.lock lock) + (let [opts (::opts response) + on-open (ac/on-open opts)] - ;; already initialized, unlock and throw, we are out - (when send! - (.unlock lock) - (throw (ex-info "Reused SSE-gen as several ring responses body. Don't do this." {}))) + ;; Set the SSEGenerator's state + (set! send! (->send output-stream opts)) + (set! on-exception (or (ac/on-exception opts) + ac/default-on-exception)) + (when-let [cb (ac/on-close opts)] + (set! on-close cb)) - (let [!error (volatile! nil)] - (try - ;; initializing the internal state - (let [opts (::opts response)] - (set! send! (->send output-stream opts)) - (set! on-exception (or (ac/on-exception opts) - ac/default-on-exception)) - (when-let [cb (ac/on-close opts)] - (set! on-close cb))) + ;; flush the HTTP headers as soon as possible + (.flush ^OutputStream output-stream) + + ;; Call on-open + (on-open this))) - ;; flush the HTTP headers - (.flush ^OutputStream output-stream) - true ;; dummy return - ;; We catch everything here, if not a Throwable may pass through - ;; !error won't catch it, on-open would be called - (catch Throwable t - (vreset! !error t)) - (finally - ;; Any exception should have been caught, - ;; the setup the internal state is done, - ;; the HTTP headers are sent - ;; we can now release the lock now - (.unlock lock) - (if-let [e @!error] - (throw e) ;; if error throw, the lock is already released - ;; if all is ok call on-open, it can safely throw... - (when-let [on-open (-> response ::opts ac/on-open)] - (on-open this))))))) - p/SSEGenerator (send-event! [this event-type data-lines opts] (u/lock! lock @@ -109,10 +89,10 @@ (close [this] (p/close-sse! this))) - -(defn ->sse-gen [] +(defn ->sse-gen + {:tag SSEGenerator} + [] (SSEGenerator. nil (ReentrantLock.) nil nil)) - diff --git a/src/test/adapter-ring/starfederation/datastar/clojure/adapter/ring/impl_test.clj b/src/test/adapter-ring/starfederation/datastar/clojure/adapter/ring/impl_test.clj index 4b59520..9d0ad02 100644 --- a/src/test/adapter-ring/starfederation/datastar/clojure/adapter/ring/impl_test.clj +++ b/src/test/adapter-ring/starfederation/datastar/clojure/adapter/ring/impl_test.clj @@ -10,108 +10,6 @@ (:import [java.io ByteArrayOutputStream])) -;; ----------------------------------------------------------------------------- -;; Testing the gnarly adapter setup by simulating it -;; ----------------------------------------------------------------------------- -(defn ->lock [] (volatile! 0)) -(defn lock! [!l] (vswap! !l inc)) -(defn unlock! [!l] (vswap! !l dec)) - - -(defn throw-key [k] (throw (ex-info "" {:k k}))) -(defn throw-already-used [] (throw-key :already-used-error)) -(defn throw-writter-error [] (throw-key :writer-ctr-error)) -(defn throw-flush-errror [] (throw-key :flush-error)) -(defn throw-on-open [] (throw-key :on-open-error)) - - -(defn set-writer! [!ref behavior] - (case behavior - :ok (vreset! !ref :new-writer) - (throw-writter-error))) - -(defn flush-headers! [v] - (case v - :ok nil - (throw-flush-errror))) - -(defn on-open [v] - (case v - :ok nil - (throw-on-open))) - - -(defn write-body-to-stream-simulation - "Here we mimick the behavior of the initialisation of ring SSE responses - We capture lock, internal state, errors and return value to check several - properties." - [writer set-writer-v flush-v on-open-v] - (let [!l (->lock) - !writer (volatile! :old-writer) - !return (volatile! nil)] - - ;; The code actually mimicking - (try - (lock! !l) - (when writer - (unlock! !l) - (throw-already-used)) - - (let [!error (volatile! nil)] - (try - (set-writer! !writer set-writer-v) - (flush-headers! flush-v) - (vreset! !return :success) - (catch Throwable t - (vreset! !error t)) - (finally - (unlock! !l) - (if-let [e @!error] - (throw e) - (on-open on-open-v))))) - (catch Throwable t - (vreset! !return t))) - - {:lock @!l - :writer @!writer - :return (let [r @!return] - (or - (-> r ex-data :k) - r))})) - - -(defn make-all-cases [] - (for [writer [nil :old-writer] - set-writer [:ok :throw] - flush [:ok :throw] - on-open [:ok :throw]] - [writer set-writer flush on-open])) - - -(defn expected [writer set-writer! flush on-open] - (cond - writer {:lock 0 :writer :old-writer :return :already-used-error} - (= set-writer! :throw) {:lock 0 :writer :old-writer :return :writer-ctr-error} - (= flush :throw) {:lock 0 :writer :new-writer :return :flush-error} - (= on-open :throw) {:lock 0 :writer :new-writer :return :on-open-error} - :else {:lock 0 :writer :new-writer :return :success})) - - -(defn run-test-case [test-case] - {:res (apply write-body-to-stream-simulation test-case) - :expected (apply expected test-case)}) - - -(defn case-coherent? [test-case] - (let [res (run-test-case test-case)] - (= (:res res) (:expected res)))) - - -(defdescribe simulate-write-body-to-stream - (it "manages locks and errors properly" - (doseq [test-case (make-all-cases)] - (expect (case-coherent? test-case) (str test-case))))) - ;; ----------------------------------------------------------------------------- ;; Basic sending of a SSE event without any server @@ -120,7 +18,8 @@ (d*/patch-elements! (at/->sse-gen) "msg")) (defn send-SSE-event [response] - (let [baos (ByteArrayOutputStream.)] + (let [response (assoc-in response [::impl/opts ac/on-open] (fn [_sse])) + baos (ByteArrayOutputStream.)] (with-open [sse-gen (impl/->sse-gen) baos baos] (p/write-body-to-stream sse-gen response baos) @@ -133,23 +32,22 @@ (defdescribe simple-test (it "can send events with a using temp buffers" (send-SSE-event {})) - + (it "can send events with a using a persistent buffered reader" (send-SSE-event {::impl/opts {ac/write-profile ac/buffered-writer-profile}})) (it "can send gziped events with a using temp buffers" (send-SSE-event {::impl/opts {ac/write-profile ac/gzip-profile :gzip? true}})) - + (it "can send gziped events with a using a persistent buffered reader" (send-SSE-event {::impl/opts {ac/write-profile ac/gzip-buffered-writer-profile :gzip? true}}))) - + (comment + (user/reload!) (require '[lazytest.repl :as ltr]) - (ltr/run-test-var #'simulate-write-body-to-stream) (ltr/run-test-var #'simple-test)) - From f54cd64cd46aeb1395179f18f470629c21ebaa33 Mon Sep 17 00:00:00 2001 From: Jeremy Schoffen Date: Fri, 28 Nov 2025 00:01:42 +0100 Subject: [PATCH 3/4] Chore: changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d298893..4187bb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ - Fixed `starfederation.datastar.clojure.adapter.http-kit2/wrap-start-responding`, the async arity was improperly managed. +### Changes +- The internals of the Ring SSE generator have been reworked. The SSE gen won't error + if a user reuses it for different requests anymore. Documentation is in place to warn + against such reuse and this change makes for much simpler code. + ## 2025-10-30 - RC4 This version's purpose is to transition the SDK to the new Datastar attribute From b5d969032532ebd03cc4da92cf9442bcc318e8ec Mon Sep 17 00:00:00 2001 From: Jeremy Schoffen Date: Fri, 28 Nov 2025 13:17:36 +0100 Subject: [PATCH 4/4] Chore: better comment --- .../main/starfederation/datastar/clojure/adapter/ring/impl.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj b/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj index ce5bfc8..63e9bba 100644 --- a/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj +++ b/libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj @@ -47,7 +47,7 @@ ;; flush the HTTP headers as soon as possible (.flush ^OutputStream output-stream) - ;; Call on-open + ;; SSE connection is ready we call user code (on-open this))) p/SSEGenerator