|
| 1 | +(ns starfederation.datastar.clojure.adapter.http-kit2 |
| 2 | + (:require |
| 3 | + [org.httpkit.server :as hk-server] |
| 4 | + [starfederation.datastar.clojure.adapter.common :as ac] |
| 5 | + [starfederation.datastar.clojure.adapter.http-kit.impl :as impl] |
| 6 | + [starfederation.datastar.clojure.utils :refer [def-clone]])) |
| 7 | + |
| 8 | + |
| 9 | +(def-clone on-open ac/on-open) |
| 10 | +(def-clone on-close ac/on-close) |
| 11 | +(def-clone on-exception ac/on-exception) |
| 12 | +(def-clone default-on-exception ac/default-on-exception) |
| 13 | + |
| 14 | + |
| 15 | +(def-clone write-profile ac/write-profile) |
| 16 | + |
| 17 | +(def-clone basic-profile impl/basic-profile) |
| 18 | +(def-clone buffered-writer-profile ac/buffered-writer-profile) |
| 19 | +(def-clone gzip-profile ac/gzip-profile) |
| 20 | +(def-clone gzip-buffered-writer-profile ac/gzip-buffered-writer-profile) |
| 21 | + |
| 22 | + |
| 23 | +(defn- as-channel |
| 24 | + " |
| 25 | + Replacement for [[hk-server/as-channel]] that doesn't deal with websockets |
| 26 | + and doen't call `on-open` itself. |
| 27 | +
|
| 28 | + `on-open` is meant to be called by either a middleware or an interceptor on the return. |
| 29 | + " |
| 30 | + [ring-req {:keys [on-close on-open init]}] |
| 31 | + |
| 32 | + (when-let [ch (:async-channel ring-req)] |
| 33 | + |
| 34 | + (when-let [f init] (f ch)) |
| 35 | + (when-let [f on-close] (org.httpkit.server/on-close ch (partial f ch))) |
| 36 | + |
| 37 | + {:body ch ::on-open on-open})) |
| 38 | + |
| 39 | + |
| 40 | +(defn ->sse-response |
| 41 | + "Make a Ring like response that will start a SSE stream. |
| 42 | +
|
| 43 | + The status code and the the SSE specific headers are not sent automatically. |
| 44 | + You need to use either [[start-responding-middleware]] or |
| 45 | + [[start-responding-interceptor]]. |
| 46 | +
|
| 47 | + Note that the SSE connection stays opened util you close it. |
| 48 | +
|
| 49 | + General options: |
| 50 | + - `:status`: status for the HTTP response, defaults to 200. |
| 51 | + - `:headers`: ring headers map to add to the response. |
| 52 | + - [[on-open]]: mandatory callback called when the generator is ready to send. |
| 53 | + - [[on-close]]: callback called when the underlying Http-kit AsyncChannel is |
| 54 | + closed. It receives a second argument, the `:status-code` value we get from |
| 55 | + the closing AsyncChannel. |
| 56 | + - [[on-exception]]: callback called when sending a SSE event throws. |
| 57 | + - [[write-profile]]: write profile for the connection. |
| 58 | + Defaults to [[basic-profile]] |
| 59 | +
|
| 60 | + SDK provided write profiles: |
| 61 | + - [[basic-profile]] |
| 62 | + - [[buffered-writer-profile]] |
| 63 | + - [[gzip-profile]] |
| 64 | + - [[gzip-buffered-writer-profile]] |
| 65 | +
|
| 66 | + You can also take a look at the `starfederation.datastar.clojure.adapter.common` |
| 67 | + namespace if you want to write your own profiles. |
| 68 | + " |
| 69 | + [ring-request {:keys [status] :as opts}] |
| 70 | + {:pre [(ac/on-open opts)]} |
| 71 | + (let [on-open-cb (ac/on-open opts) |
| 72 | + on-close-cb (ac/on-close opts) |
| 73 | + future-send! (promise) |
| 74 | + future-gen (promise)] |
| 75 | + (assoc |
| 76 | + (as-channel ring-request |
| 77 | + {:on-open |
| 78 | + (fn [ch] |
| 79 | + (let [send! (impl/->send! ch opts) |
| 80 | + sse-gen (impl/->sse-gen ch send! opts)] |
| 81 | + (deliver future-gen sse-gen) |
| 82 | + (deliver future-send! send!) |
| 83 | + (on-open-cb sse-gen))) |
| 84 | + |
| 85 | + :on-close |
| 86 | + (fn [_ status] |
| 87 | + (let [closing-res |
| 88 | + (ac/close-sse! |
| 89 | + #(when-let [send! (deref future-send! 0 nil)] (send!)) |
| 90 | + #(when on-close-cb |
| 91 | + (on-close-cb (deref future-gen 0 nil) status)))] |
| 92 | + (if (instance? Exception closing-res) |
| 93 | + (throw closing-res) |
| 94 | + closing-res)))}) |
| 95 | + :status (or status 200) |
| 96 | + :headers (ac/headers ring-request opts) |
| 97 | + ::datastar-sse-response true))) |
| 98 | + |
| 99 | + |
| 100 | +(defn start-responding! |
| 101 | + "Function that takes a ring response map and sends HTTP status & headers using |
| 102 | + the [[AsyncChannel]] that should be in the body if the |
| 103 | + `::datastar-sse-response` key is present." |
| 104 | + [response] |
| 105 | + (if (::datastar-sse-response response) |
| 106 | + (let [{on-open ::on-open |
| 107 | + ch :body} response |
| 108 | + response (dissoc response :body ::on-open ::datastar-sse-response)] |
| 109 | + (hk-server/send! ch response false) |
| 110 | + (on-open ch)) |
| 111 | + response)) |
| 112 | + |
| 113 | + |
| 114 | + |
| 115 | +(defn wrap-start-responding |
| 116 | + "Middleware necessary to use in conjunction with [[->sse-response]]. |
| 117 | +
|
| 118 | + It will check if the response is a datastar-sse-response |
| 119 | + (created with [[->sse-response]]). In this case it will send the initial |
| 120 | + response containing headers and status code, then call `on-open`." |
| 121 | + [handler] |
| 122 | + (fn |
| 123 | + ([req] |
| 124 | + (let [response (handler req)] |
| 125 | + (start-responding! response) |
| 126 | + response)) |
| 127 | + ([req respond _raise] |
| 128 | + (let [response (handler req)] |
| 129 | + (start-responding! response) |
| 130 | + (respond response))))) |
| 131 | + |
| 132 | + |
| 133 | +(def start-responding-middleware |
| 134 | + "Reitit middleware map for [[wrap-start-responding]]." |
| 135 | + {:name ::start-responding |
| 136 | + :wrap wrap-start-responding}) |
| 137 | + |
| 138 | + |
| 139 | +(def start-responding-interceptor |
| 140 | + " |
| 141 | + Interceptor necessary to use in conjunction with [[->sse-response]]. |
| 142 | +
|
| 143 | + In the `:leave` fn, it will check if the response is a datastar-sse-response |
| 144 | + (created with [[->sse-response]]). In this case it will send the initial |
| 145 | + response containing headers and status code, then call `on-open`." |
| 146 | + {:name ::start-responding |
| 147 | + :leave (fn [ctx] |
| 148 | + (let [response (:response ctx)] |
| 149 | + (start-responding! response) |
| 150 | + ctx))}) |
| 151 | + |
0 commit comments