Skip to content

Commit 3d1daf3

Browse files
committed
Refactor: getting rid of the convoluted logic in the jetty implementation
The previous implementation used a complicated logic involving locks to protect against an api usage that should not happen. The protection is now removed, the code is much simpler.
1 parent 0b90fc6 commit 3d1daf3

File tree

3 files changed

+33
-151
lines changed
  • libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter
  • src/test/adapter-ring/starfederation/datastar/clojure/adapter/ring

3 files changed

+33
-151
lines changed

libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring.clj

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@
2323
"Returns a ring response that will start a SSE stream.
2424
2525
The status code will be either 200 or the user provided one.
26+
2627
Specific SSE headers are set automatically, the user provided ones will be
2728
merged. The response body is a sse generator implementing
2829
`ring.core.protocols/StreamableResponseBody`.
2930
31+
32+
The body is a special value that is part of the SSE machinery and should not
33+
be used directly. In other words you must only interact with the SSEGen
34+
provided as argument of the different callbacks described below.
35+
3036
In sync mode, the connection is closed automatically when the handler is
31-
done running. You need to explicitely close it in rinc async.
37+
done running. You need to explicitly close it in rinc async.
3238
3339
Opts:
3440
- `:status`: status for the HTTP response, defaults to 200
@@ -52,9 +58,7 @@
5258
"
5359
[ring-request {:keys [status] :as opts}]
5460
{:pre [(ac/on-open opts)]}
55-
(let [sse-gen (impl/->sse-gen)]
56-
{:status (or status 200)
57-
:headers (ac/headers ring-request opts)
58-
:body sse-gen
59-
::impl/opts opts}))
60-
61+
{:status (or status 200)
62+
:headers (ac/headers ring-request opts)
63+
:body (impl/->sse-gen)
64+
::impl/opts opts})

libraries/sdk-ring/src/main/starfederation/datastar/clojure/adapter/ring/impl.clj

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -34,42 +34,22 @@
3434
^:unsynchronized-mutable on-exception]
3535
rp/StreamableResponseBody
3636
(write-body-to-stream [this response output-stream]
37-
(.lock lock)
37+
(let [opts (::opts response)
38+
on-open (ac/on-open opts)]
3839

39-
;; already initialized, unlock and throw, we are out
40-
(when send!
41-
(.unlock lock)
42-
(throw (ex-info "Reused SSE-gen as several ring responses body. Don't do this." {})))
40+
;; Set the SSEGenerator's state
41+
(set! send! (->send output-stream opts))
42+
(set! on-exception (or (ac/on-exception opts)
43+
ac/default-on-exception))
44+
(when-let [cb (ac/on-close opts)]
45+
(set! on-close cb))
4346

44-
(let [!error (volatile! nil)]
45-
(try
46-
;; initializing the internal state
47-
(let [opts (::opts response)]
48-
(set! send! (->send output-stream opts))
49-
(set! on-exception (or (ac/on-exception opts)
50-
ac/default-on-exception))
51-
(when-let [cb (ac/on-close opts)]
52-
(set! on-close cb)))
47+
;; flush the HTTP headers as soon as possible
48+
(.flush ^OutputStream output-stream)
49+
50+
;; Call on-open
51+
(on-open this)))
5352

54-
;; flush the HTTP headers
55-
(.flush ^OutputStream output-stream)
56-
true ;; dummy return
57-
;; We catch everything here, if not a Throwable may pass through
58-
;; !error won't catch it, on-open would be called
59-
(catch Throwable t
60-
(vreset! !error t))
61-
(finally
62-
;; Any exception should have been caught,
63-
;; the setup the internal state is done,
64-
;; the HTTP headers are sent
65-
;; we can now release the lock now
66-
(.unlock lock)
67-
(if-let [e @!error]
68-
(throw e) ;; if error throw, the lock is already released
69-
;; if all is ok call on-open, it can safely throw...
70-
(when-let [on-open (-> response ::opts ac/on-open)]
71-
(on-open this)))))))
72-
7353
p/SSEGenerator
7454
(send-event! [this event-type data-lines opts]
7555
(u/lock! lock
@@ -109,10 +89,10 @@
10989
(close [this]
11090
(p/close-sse! this)))
11191

112-
113-
(defn ->sse-gen []
92+
(defn ->sse-gen
93+
{:tag SSEGenerator}
94+
[]
11495
(SSEGenerator. nil
11596
(ReentrantLock.)
11697
nil
11798
nil))
118-

src/test/adapter-ring/starfederation/datastar/clojure/adapter/ring/impl_test.clj

Lines changed: 6 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -10,108 +10,6 @@
1010
(:import
1111
[java.io ByteArrayOutputStream]))
1212

13-
;; -----------------------------------------------------------------------------
14-
;; Testing the gnarly adapter setup by simulating it
15-
;; -----------------------------------------------------------------------------
16-
(defn ->lock [] (volatile! 0))
17-
(defn lock! [!l] (vswap! !l inc))
18-
(defn unlock! [!l] (vswap! !l dec))
19-
20-
21-
(defn throw-key [k] (throw (ex-info "" {:k k})))
22-
(defn throw-already-used [] (throw-key :already-used-error))
23-
(defn throw-writter-error [] (throw-key :writer-ctr-error))
24-
(defn throw-flush-errror [] (throw-key :flush-error))
25-
(defn throw-on-open [] (throw-key :on-open-error))
26-
27-
28-
(defn set-writer! [!ref behavior]
29-
(case behavior
30-
:ok (vreset! !ref :new-writer)
31-
(throw-writter-error)))
32-
33-
(defn flush-headers! [v]
34-
(case v
35-
:ok nil
36-
(throw-flush-errror)))
37-
38-
(defn on-open [v]
39-
(case v
40-
:ok nil
41-
(throw-on-open)))
42-
43-
44-
(defn write-body-to-stream-simulation
45-
"Here we mimick the behavior of the initialisation of ring SSE responses
46-
We capture lock, internal state, errors and return value to check several
47-
properties."
48-
[writer set-writer-v flush-v on-open-v]
49-
(let [!l (->lock)
50-
!writer (volatile! :old-writer)
51-
!return (volatile! nil)]
52-
53-
;; The code actually mimicking
54-
(try
55-
(lock! !l)
56-
(when writer
57-
(unlock! !l)
58-
(throw-already-used))
59-
60-
(let [!error (volatile! nil)]
61-
(try
62-
(set-writer! !writer set-writer-v)
63-
(flush-headers! flush-v)
64-
(vreset! !return :success)
65-
(catch Throwable t
66-
(vreset! !error t))
67-
(finally
68-
(unlock! !l)
69-
(if-let [e @!error]
70-
(throw e)
71-
(on-open on-open-v)))))
72-
(catch Throwable t
73-
(vreset! !return t)))
74-
75-
{:lock @!l
76-
:writer @!writer
77-
:return (let [r @!return]
78-
(or
79-
(-> r ex-data :k)
80-
r))}))
81-
82-
83-
(defn make-all-cases []
84-
(for [writer [nil :old-writer]
85-
set-writer [:ok :throw]
86-
flush [:ok :throw]
87-
on-open [:ok :throw]]
88-
[writer set-writer flush on-open]))
89-
90-
91-
(defn expected [writer set-writer! flush on-open]
92-
(cond
93-
writer {:lock 0 :writer :old-writer :return :already-used-error}
94-
(= set-writer! :throw) {:lock 0 :writer :old-writer :return :writer-ctr-error}
95-
(= flush :throw) {:lock 0 :writer :new-writer :return :flush-error}
96-
(= on-open :throw) {:lock 0 :writer :new-writer :return :on-open-error}
97-
:else {:lock 0 :writer :new-writer :return :success}))
98-
99-
100-
(defn run-test-case [test-case]
101-
{:res (apply write-body-to-stream-simulation test-case)
102-
:expected (apply expected test-case)})
103-
104-
105-
(defn case-coherent? [test-case]
106-
(let [res (run-test-case test-case)]
107-
(= (:res res) (:expected res))))
108-
109-
110-
(defdescribe simulate-write-body-to-stream
111-
(it "manages locks and errors properly"
112-
(doseq [test-case (make-all-cases)]
113-
(expect (case-coherent? test-case) (str test-case)))))
114-
11513

11614
;; -----------------------------------------------------------------------------
11715
;; Basic sending of a SSE event without any server
@@ -120,7 +18,8 @@
12018
(d*/patch-elements! (at/->sse-gen) "msg"))
12119

12220
(defn send-SSE-event [response]
123-
(let [baos (ByteArrayOutputStream.)]
21+
(let [response (assoc-in response [::impl/opts ac/on-open] (fn [_sse]))
22+
baos (ByteArrayOutputStream.)]
12423
(with-open [sse-gen (impl/->sse-gen)
12524
baos baos]
12625
(p/write-body-to-stream sse-gen response baos)
@@ -133,23 +32,22 @@
13332
(defdescribe simple-test
13433
(it "can send events with a using temp buffers"
13534
(send-SSE-event {}))
136-
35+
13736
(it "can send events with a using a persistent buffered reader"
13837
(send-SSE-event {::impl/opts {ac/write-profile ac/buffered-writer-profile}}))
13938

14039
(it "can send gziped events with a using temp buffers"
14140
(send-SSE-event {::impl/opts {ac/write-profile ac/gzip-profile
14241
:gzip? true}}))
143-
42+
14443
(it "can send gziped events with a using a persistent buffered reader"
14544
(send-SSE-event {::impl/opts {ac/write-profile ac/gzip-buffered-writer-profile
14645
:gzip? true}})))
147-
46+
14847

14948

15049

15150
(comment
51+
(user/reload!)
15252
(require '[lazytest.repl :as ltr])
153-
(ltr/run-test-var #'simulate-write-body-to-stream)
15453
(ltr/run-test-var #'simple-test))
155-

0 commit comments

Comments
 (0)