Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
- Fixed `starfederation.datastar.clojure.adapter.http-kit2/wrap-start-responding`,
the async arity was improperly managed.

### Changes
- The internals of the Ring SSE generator have been reworked. The SSE gen won't error
if a user reuses it for different requests anymore. Documentation is in place to warn
against such reuse and this change makes for much simpler code.

## 2025-10-30 - RC4

This version's purpose is to transition the SDK to the new Datastar attribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
"Returns a ring response that will start a SSE stream.

The status code will be either 200 or the user provided one.

Specific SSE headers are set automatically, the user provided ones will be
merged. The response body is a sse generator implementing
`ring.core.protocols/StreamableResponseBody`.


The body is a special value that is part of the SSE machinery and should not
be used directly. In other words you must only interact with the SSEGen
provided as argument of the different callbacks described below.

In sync mode, the connection is closed automatically when the handler is
done running. You need to explicitely close it in rinc async.
done running. You need to explicitly close it in rinc async.

Opts:
- `:status`: status for the HTTP response, defaults to 200
Expand All @@ -52,9 +58,7 @@
"
[ring-request {:keys [status] :as opts}]
{:pre [(ac/on-open opts)]}
(let [sse-gen (impl/->sse-gen)]
{:status (or status 200)
:headers (ac/headers ring-request opts)
:body sse-gen
::impl/opts opts}))

{:status (or status 200)
:headers (ac/headers ring-request opts)
:body (impl/->sse-gen)
::impl/opts opts})
Original file line number Diff line number Diff line change
Expand Up @@ -34,42 +34,22 @@
^:unsynchronized-mutable on-exception]
rp/StreamableResponseBody
(write-body-to-stream [this response output-stream]
(.lock lock)
(let [opts (::opts response)
on-open (ac/on-open opts)]

;; already initialized, unlock and throw, we are out
(when send!
(.unlock lock)
(throw (ex-info "Reused SSE-gen as several ring responses body. Don't do this." {})))
;; Set the SSEGenerator's state
(set! send! (->send output-stream opts))
(set! on-exception (or (ac/on-exception opts)
ac/default-on-exception))
(when-let [cb (ac/on-close opts)]
(set! on-close cb))

(let [!error (volatile! nil)]
(try
;; initializing the internal state
(let [opts (::opts response)]
(set! send! (->send output-stream opts))
(set! on-exception (or (ac/on-exception opts)
ac/default-on-exception))
(when-let [cb (ac/on-close opts)]
(set! on-close cb)))
;; flush the HTTP headers as soon as possible
(.flush ^OutputStream output-stream)

;; SSE connection is ready we call user code
(on-open this)))

;; flush the HTTP headers
(.flush ^OutputStream output-stream)
true ;; dummy return
;; We catch everything here, if not a Throwable may pass through
;; !error won't catch it, on-open would be called
(catch Throwable t
(vreset! !error t))
(finally
;; Any exception should have been caught,
;; the setup the internal state is done,
;; the HTTP headers are sent
;; we can now release the lock now
(.unlock lock)
(if-let [e @!error]
(throw e) ;; if error throw, the lock is already released
;; if all is ok call on-open, it can safely throw...
(when-let [on-open (-> response ::opts ac/on-open)]
(on-open this)))))))

p/SSEGenerator
(send-event! [this event-type data-lines opts]
(u/lock! lock
Expand Down Expand Up @@ -109,10 +89,10 @@
(close [this]
(p/close-sse! this)))


(defn ->sse-gen []
(defn ->sse-gen
{:tag SSEGenerator}
[]
(SSEGenerator. nil
(ReentrantLock.)
nil
nil))

Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@

(defn default-on-exception
"Default [[on-exception]] callback, it returns `true` on [[IOException]] which
closes, the generator. It rethrows the exception wrapped with `ex-infor`
closes, the generator. It rethrows the exception wrapped with `ex-info`
otherwise."
[_sse e ctx]
(if (instance? IOException e)
Expand Down
7 changes: 0 additions & 7 deletions src/bb-example/src/main/bb_example/animation/handlers.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
(defn ping-handler
([req]
(when-let [coords (recover-coords req)]
(println "-- ping " coords)
(state/add-ping! coords))
{:status 204})
([req respond _raise]
Expand All @@ -53,7 +52,6 @@

(defn random-pings-handler
([_req]
(println "-- add pixels")
(state/add-random-pings!)
{:status 204})
([req respond _raise]
Expand All @@ -63,15 +61,13 @@

(defn reset-handler
([_req]
(println "-- reseting state")
(state/reset-state!)
{:status 204})
([req respond _raise]
(respond (reset-handler req))))

(defn step-handler
([_req]
(println "-- Step 1")
(state/step-state!)
{:status 204})
([req respond _raise]
Expand All @@ -82,7 +78,6 @@

(defn play-handler
([_req]
(println "-- play animation")
(state/start-animating!)
{:status 204})
([req respond _raise]
Expand All @@ -91,7 +86,6 @@

(defn pause-handler
([_req]
(println "-- pause animation")
(state/stop-animating!)
{:status 204})
([req respond _raise]
Expand All @@ -100,7 +94,6 @@
(defn resize-handler
([req]
(let [{x "rows" y "columns"} (c/get-signals req)]
(println "-- resize" x y)
(state/resize! x y)
{:status 204}))
([req respond _raise]
Expand Down
2 changes: 1 addition & 1 deletion src/bb-example/src/main/bb_example/animation/rendering.clj
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@


(defn cell-style [v]
(str "background-color: "v";}"))
(str "background-color: "v";"))


(def on-click
Expand Down
3 changes: 0 additions & 3 deletions src/dev/examples/animation_gzip.clj
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,3 @@
(u/clear-terminal!)
(u/reboot-hk-server! #'handler-http-kit)
(u/reboot-jetty-server! #'handler-ring {:async? true}))



3 changes: 1 addition & 2 deletions src/dev/examples/animation_gzip/rendering.clj
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@
(defn rgb [r g b]
(str "rgb(" r ", " g ", " b")"))


(defn cell-style [v]
(str "background-color: "v";}"))
(str "background-color: "v";"))


(def on-click
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,108 +10,6 @@
(:import
[java.io ByteArrayOutputStream]))

;; -----------------------------------------------------------------------------
;; Testing the gnarly adapter setup by simulating it
;; -----------------------------------------------------------------------------
(defn ->lock [] (volatile! 0))
(defn lock! [!l] (vswap! !l inc))
(defn unlock! [!l] (vswap! !l dec))


(defn throw-key [k] (throw (ex-info "" {:k k})))
(defn throw-already-used [] (throw-key :already-used-error))
(defn throw-writter-error [] (throw-key :writer-ctr-error))
(defn throw-flush-errror [] (throw-key :flush-error))
(defn throw-on-open [] (throw-key :on-open-error))


(defn set-writer! [!ref behavior]
(case behavior
:ok (vreset! !ref :new-writer)
(throw-writter-error)))

(defn flush-headers! [v]
(case v
:ok nil
(throw-flush-errror)))

(defn on-open [v]
(case v
:ok nil
(throw-on-open)))


(defn write-body-to-stream-simulation
"Here we mimick the behavior of the initialisation of ring SSE responses
We capture lock, internal state, errors and return value to check several
properties."
[writer set-writer-v flush-v on-open-v]
(let [!l (->lock)
!writer (volatile! :old-writer)
!return (volatile! nil)]

;; The code actually mimicking
(try
(lock! !l)
(when writer
(unlock! !l)
(throw-already-used))

(let [!error (volatile! nil)]
(try
(set-writer! !writer set-writer-v)
(flush-headers! flush-v)
(vreset! !return :success)
(catch Throwable t
(vreset! !error t))
(finally
(unlock! !l)
(if-let [e @!error]
(throw e)
(on-open on-open-v)))))
(catch Throwable t
(vreset! !return t)))

{:lock @!l
:writer @!writer
:return (let [r @!return]
(or
(-> r ex-data :k)
r))}))


(defn make-all-cases []
(for [writer [nil :old-writer]
set-writer [:ok :throw]
flush [:ok :throw]
on-open [:ok :throw]]
[writer set-writer flush on-open]))


(defn expected [writer set-writer! flush on-open]
(cond
writer {:lock 0 :writer :old-writer :return :already-used-error}
(= set-writer! :throw) {:lock 0 :writer :old-writer :return :writer-ctr-error}
(= flush :throw) {:lock 0 :writer :new-writer :return :flush-error}
(= on-open :throw) {:lock 0 :writer :new-writer :return :on-open-error}
:else {:lock 0 :writer :new-writer :return :success}))


(defn run-test-case [test-case]
{:res (apply write-body-to-stream-simulation test-case)
:expected (apply expected test-case)})


(defn case-coherent? [test-case]
(let [res (run-test-case test-case)]
(= (:res res) (:expected res))))


(defdescribe simulate-write-body-to-stream
(it "manages locks and errors properly"
(doseq [test-case (make-all-cases)]
(expect (case-coherent? test-case) (str test-case)))))


;; -----------------------------------------------------------------------------
;; Basic sending of a SSE event without any server
Expand All @@ -120,7 +18,8 @@
(d*/patch-elements! (at/->sse-gen) "msg"))

(defn send-SSE-event [response]
(let [baos (ByteArrayOutputStream.)]
(let [response (assoc-in response [::impl/opts ac/on-open] (fn [_sse]))
baos (ByteArrayOutputStream.)]
(with-open [sse-gen (impl/->sse-gen)
baos baos]
(p/write-body-to-stream sse-gen response baos)
Expand All @@ -133,23 +32,22 @@
(defdescribe simple-test
(it "can send events with a using temp buffers"
(send-SSE-event {}))

(it "can send events with a using a persistent buffered reader"
(send-SSE-event {::impl/opts {ac/write-profile ac/buffered-writer-profile}}))

(it "can send gziped events with a using temp buffers"
(send-SSE-event {::impl/opts {ac/write-profile ac/gzip-profile
:gzip? true}}))

(it "can send gziped events with a using a persistent buffered reader"
(send-SSE-event {::impl/opts {ac/write-profile ac/gzip-buffered-writer-profile
:gzip? true}})))




(comment
(user/reload!)
(require '[lazytest.repl :as ltr])
(ltr/run-test-var #'simulate-write-body-to-stream)
(ltr/run-test-var #'simple-test))