Skip to content

Commit b92e19b

Browse files
committed
Feature: clojure-SDK alternative http-kit api
1 parent 78d62fa commit b92e19b

File tree

9 files changed

+567
-25
lines changed

9 files changed

+567
-25
lines changed

sdk/clojure/CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,19 @@
2626

2727
### Added
2828

29+
- There is a new http-kit API that allows a more natural ring response model when
30+
using SSE. With the current API the status and headers for a response are
31+
sent directly while `->sse-response` is running, the `on-open` callback runs
32+
just after. For instance that any middleware that would add headers after the
33+
execution of the `->sse-response` function won't work, the initial response
34+
being already sent.
35+
The new `starfederation.datastar.clojure.adapter.http-kit2`
36+
API changes this behavior. In this new api the initial response is not sent
37+
during `->sse-response`. Instead a middleware takes care of sending it and
38+
only then calls the `on-open` callback. If this middleware is the last to run
39+
on the return any addition to the response map will be taken into account.
2940
- A new library providing Brotli write profile has been added.
41+
3042
## 2025-04-07
3143

3244
### Added
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
(ns starfederation.datastar.clojure.adapter.http-kit2
2+
(:require
3+
[org.httpkit.server :as hk-server]
4+
[starfederation.datastar.clojure.adapter.common :as ac]
5+
[starfederation.datastar.clojure.adapter.http-kit.impl :as impl]
6+
[starfederation.datastar.clojure.utils :refer [def-clone]]))
7+
8+
9+
(def-clone on-open ac/on-open)
10+
(def-clone on-close ac/on-close)
11+
(def-clone on-exception ac/on-exception)
12+
(def-clone default-on-exception ac/default-on-exception)
13+
14+
15+
(def-clone write-profile ac/write-profile)
16+
17+
(def-clone basic-profile impl/basic-profile)
18+
(def-clone buffered-writer-profile ac/buffered-writer-profile)
19+
(def-clone gzip-profile ac/gzip-profile)
20+
(def-clone gzip-buffered-writer-profile ac/gzip-buffered-writer-profile)
21+
22+
23+
(defn- as-channel
24+
"
25+
Replacement for [[hk-server/as-channel]] that doesn't deal with websockets
26+
and doen't call `on-open` itself.
27+
28+
`on-open` is meant to be called by either a middleware or an interceptor on the return.
29+
"
30+
[ring-req {:keys [on-close on-open init]}]
31+
32+
(when-let [ch (:async-channel ring-req)]
33+
34+
(when-let [f init] (f ch))
35+
(when-let [f on-close] (org.httpkit.server/on-close ch (partial f ch)))
36+
37+
{:body ch ::on-open on-open}))
38+
39+
40+
(defn ->sse-response
41+
"Make a Ring like response that will start a SSE stream.
42+
43+
The status code and the the SSE specific headers are not sent automatically.
44+
You need to use either [[start-responding-middleware]] or
45+
[[start-responding-interceptor]].
46+
47+
Note that the SSE connection stays opened util you close it.
48+
49+
General options:
50+
- `:status`: status for the HTTP response, defaults to 200.
51+
- `:headers`: ring headers map to add to the response.
52+
- [[on-open]]: mandatory callback called when the generator is ready to send.
53+
- [[on-close]]: callback called when the underlying Http-kit AsyncChannel is
54+
closed. It receives a second argument, the `:status-code` value we get from
55+
the closing AsyncChannel.
56+
- [[on-exception]]: callback called when sending a SSE event throws.
57+
- [[write-profile]]: write profile for the connection.
58+
Defaults to [[basic-profile]]
59+
60+
SDK provided write profiles:
61+
- [[basic-profile]]
62+
- [[buffered-writer-profile]]
63+
- [[gzip-profile]]
64+
- [[gzip-buffered-writer-profile]]
65+
66+
You can also take a look at the `starfederation.datastar.clojure.adapter.common`
67+
namespace if you want to write your own profiles.
68+
"
69+
[ring-request {:keys [status] :as opts}]
70+
{:pre [(ac/on-open opts)]}
71+
(let [on-open-cb (ac/on-open opts)
72+
on-close-cb (ac/on-close opts)
73+
future-send! (promise)
74+
future-gen (promise)]
75+
(assoc
76+
(as-channel ring-request
77+
{:on-open
78+
(fn [ch]
79+
(let [send! (impl/->send! ch opts)
80+
sse-gen (impl/->sse-gen ch send! opts)]
81+
(deliver future-gen sse-gen)
82+
(deliver future-send! send!)
83+
(on-open-cb sse-gen)))
84+
85+
:on-close
86+
(fn [_ status]
87+
(let [closing-res
88+
(ac/close-sse!
89+
#(when-let [send! (deref future-send! 0 nil)] (send!))
90+
#(when on-close-cb
91+
(on-close-cb (deref future-gen 0 nil) status)))]
92+
(if (instance? Exception closing-res)
93+
(throw closing-res)
94+
closing-res)))})
95+
:status (or status 200)
96+
:headers (ac/headers ring-request opts)
97+
::datastar-sse-response true)))
98+
99+
100+
(defn start-responding!
101+
"Function that takes a ring response map and sends HTTP status & headers using
102+
the [[AsyncChannel]] that should be in the body if the
103+
`::datastar-sse-response` key is present."
104+
[response]
105+
(if (::datastar-sse-response response)
106+
(let [{on-open ::on-open
107+
ch :body} response
108+
response (dissoc response :body ::on-open ::datastar-sse-response)]
109+
(hk-server/send! ch response false)
110+
(on-open ch))
111+
response))
112+
113+
114+
115+
(defn wrap-start-responding
116+
"Middleware necessary to use in conjunction with [[->sse-response]].
117+
118+
It will check if the response is a datastar-sse-response
119+
(created with [[->sse-response]]). In this case it will send the initial
120+
response containing headers and status code, then call `on-open`."
121+
[handler]
122+
(fn
123+
([req]
124+
(let [response (handler req)]
125+
(start-responding! response)
126+
response))
127+
([req respond _raise]
128+
(let [response (handler req)]
129+
(start-responding! response)
130+
(respond response)))))
131+
132+
133+
(def start-responding-middleware
134+
"Reitit middleware map for [[wrap-start-responding]]."
135+
{:name ::start-responding
136+
:wrap wrap-start-responding})
137+
138+
139+
(def start-responding-interceptor
140+
"
141+
Interceptor necessary to use in conjunction with [[->sse-response]].
142+
143+
In the `:leave` fn, it will check if the response is a datastar-sse-response
144+
(created with [[->sse-response]]). In this case it will send the initial
145+
response containing headers and status code, then call `on-open`."
146+
{:name ::start-responding
147+
:leave (fn [ctx]
148+
(let [response (:response ctx)]
149+
(start-responding! response)
150+
ctx))})
151+

sdk/clojure/src/dev/examples/animation_gzip.clj

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
(ns examples.animation-gzip
22
(:require
3-
[dev.onionpancakes.chassis.core :as h]
3+
[examples.animation-gzip.broadcast :as broadcast]
44
[examples.animation-gzip.handlers :as handlers]
55
[examples.animation-gzip.rendering :as rendering]
66
[examples.animation-gzip.state :as state]
@@ -20,28 +20,7 @@
2020
;; This example let's use play with fat updates and compression
2121
;; to get an idea of the gains compression can help use achieve
2222
;; in terms of network usage.
23-
24-
(defn send-frame! [sse frame]
25-
(try
26-
(d*/patch-elements! sse frame)
27-
(catch Exception e
28-
(println e))))
29-
30-
31-
(defn broadcast-new-frame! [frame]
32-
(let [sses @state/!conns]
33-
(doseq [sse sses]
34-
(send-frame! sse frame))))
35-
36-
37-
(defn install-watch! []
38-
(add-watch state/!state ::watch
39-
(fn [_k _ref old new]
40-
(when-not (identical? old new)
41-
(let [frame (rendering/render-content new)]
42-
(broadcast-new-frame! frame))))))
43-
44-
(install-watch!)
23+
(broadcast/install-watch!)
4524

4625

4726
(defn ->routes [->sse-response opts]
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
(ns examples.animation-gzip.broadcast
2+
(:require
3+
[examples.animation-gzip.rendering :as rendering]
4+
[examples.animation-gzip.state :as state]
5+
[starfederation.datastar.clojure.api :as d*]))
6+
7+
8+
(defn send-frame! [sse frame]
9+
(try
10+
(d*/patch-elements! sse frame)
11+
(catch Exception e
12+
(println e))))
13+
14+
15+
(defn broadcast-new-frame! [frame]
16+
(let [sses @state/!conns]
17+
(doseq [sse sses]
18+
(send-frame! sse frame))))
19+
20+
21+
(defn install-watch! []
22+
(add-watch state/!state ::watch
23+
(fn [_k _ref old new]
24+
(when-not (identical? old new)
25+
(let [frame (rendering/render-content new)]
26+
(broadcast-new-frame! frame))))))
27+
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
(ns examples.http-kit2.animation
2+
(:require
3+
[examples.animation-gzip.broadcast :as broadcast]
4+
[examples.animation-gzip.handlers :as handlers]
5+
[examples.animation-gzip.rendering :as rendering]
6+
[examples.animation-gzip.state :as state]
7+
[examples.common :as c]
8+
[examples.utils :as u]
9+
[reitit.ring :as rr]
10+
[reitit.ring.middleware.exception :as reitit-exception]
11+
[reitit.ring.middleware.parameters :as reitit-params]
12+
[starfederation.datastar.clojure.adapter.http-kit2 :as hk-gen]
13+
[starfederation.datastar.clojure.adapter.http-kit-schemas]
14+
[starfederation.datastar.clojure.adapter.ring :as ring-gen]
15+
[starfederation.datastar.clojure.adapter.ring-schemas]
16+
[starfederation.datastar.clojure.api-schemas]
17+
[starfederation.datastar.clojure.brotli :as brotli]))
18+
19+
;; This example let's use play with fat updates and compression
20+
;; to get an idea of the gains compression can help use achieve
21+
;; in terms of network usage.
22+
23+
(broadcast/install-watch!)
24+
25+
26+
(defn ->routes [->sse-response opts]
27+
[["/" handlers/home-handler]
28+
["/ping/:id" {:handler handlers/ping-handler
29+
:middleware [reitit-params/parameters-middleware]}]
30+
["/random-10" handlers/random-pings-handler]
31+
["/reset" handlers/reset-handler]
32+
["/step1" handlers/step-handler]
33+
["/play" handlers/play-handler]
34+
["/pause" handlers/pause-handler]
35+
["/updates" {:handler (handlers/->updates-handler ->sse-response opts)
36+
:middleware [[hk-gen/start-responding-middleware]]}]
37+
["/refresh" handlers/refresh-handler]
38+
["/resize" handlers/resize-handler]
39+
c/datastar-route])
40+
41+
42+
(defn ->router [->sse-handler opts]
43+
(rr/router (->routes ->sse-handler opts)))
44+
45+
46+
(defn ->handler [->sse-response & {:as opts}]
47+
(rr/ring-handler
48+
(->router ->sse-response opts)
49+
(rr/create-default-handler)
50+
{:middleware [reitit-exception/exception-middleware]}))
51+
52+
53+
(def handler-http-kit (->handler hk-gen/->sse-response
54+
{hk-gen/write-profile (brotli/->brotli-profile)}))
55+
56+
(def handler-ring (->handler ring-gen/->sse-response
57+
{ring-gen/write-profile ring-gen/gzip-profile}))
58+
59+
(defn after-ns-reload []
60+
(println "rebooting servers")
61+
(u/reboot-hk-server! #'handler-http-kit)
62+
(u/reboot-jetty-server! #'handler-ring {:async? true}))
63+
64+
65+
(comment
66+
#_{:clj-kondo/ignore true}
67+
(user/reload!)
68+
:help
69+
:dbg
70+
:rec
71+
:stop
72+
*e
73+
state/!state
74+
state/!conns
75+
(reset! state/!conns #{})
76+
77+
(-> state/!state
78+
deref
79+
rendering/page)
80+
(state/resize! 10 10)
81+
(state/resize! 20 20)
82+
(state/resize! 25 25)
83+
(state/resize! 30 30)
84+
(state/resize! 50 50)
85+
(state/reset-state!)
86+
(state/add-random-pings!)
87+
(state/step-state!)
88+
(state/start-animating!)
89+
(u/clear-terminal!)
90+
(u/reboot-hk-server! #'handler-http-kit)
91+
(u/reboot-jetty-server! #'handler-ring {:async? true}))
92+
93+
94+

0 commit comments

Comments
 (0)