|
16 | 16 | [nodely.engine.core-async.core :as nodely.async] |
17 | 17 | [nodely.engine.schema :as schema] |
18 | 18 | [nodely.fixtures :as fixtures] |
19 | | - [nodely.syntax :as syntax :refer [>leaf >sequence >value]] |
| 19 | + [nodely.syntax :as syntax :refer [>leaf >sequence >value blocking]] |
20 | 20 | [nodely.syntax.schema :refer [yielding-schema]] |
21 | 21 | [promesa.core :as p] |
22 | 22 | [schema.core :as s])) |
|
54 | 54 | (>leaf ?c)) |
55 | 55 | :z (>leaf ?w)}) |
56 | 56 |
|
57 | | -(def env-with-blocking-tag {:a (>leaf (Thread/currentThread)) |
58 | | - :b (syntax/blocking (>leaf (Thread/currentThread))) |
59 | | - :a-name (>leaf (.getName ?a)) |
60 | | - :b-name (>leaf (.getName ?b)) |
61 | | - :c (>leaf (str ?a-name " " ?b-name))}) |
62 | | - |
63 | | -(def env-with-nine-sleeps {:a (syntax/blocking (>leaf (do (Thread/sleep 1000) :a))) |
64 | | - :b (syntax/blocking (>leaf (do (Thread/sleep 1000) :b))) |
65 | | - :c (syntax/blocking (>leaf (do (Thread/sleep 1000) :c))) |
66 | | - :d (syntax/blocking (>leaf (do (Thread/sleep 1000) :d))) |
67 | | - :e (syntax/blocking (>leaf (do (Thread/sleep 1000) :e))) |
68 | | - :f (syntax/blocking (>leaf (do (Thread/sleep 1000) :f))) |
69 | | - :g (syntax/blocking (>leaf (do (Thread/sleep 1000) :g))) |
70 | | - :h (syntax/blocking (>leaf (do (Thread/sleep 1000) :h))) |
71 | | - :i (syntax/blocking (>leaf (do (Thread/sleep 1000) :i))) |
72 | | - :z (>leaf (into #{} [?a ?b ?c ?d ?e ?f ?g ?h ?i]))}) |
| 57 | +(def env-with-blocking-take {:a (>leaf {:the-chan (async/chan 1)}) |
| 58 | + :b (blocking (>leaf (async/>!! (:the-chan ?a) :test))) |
| 59 | + :c (>leaf (try (async/<!! (:the-chan ?a)) |
| 60 | + (catch Throwable t {:the-ex t}))) |
| 61 | + :d (>leaf (vector ?b (:the-ex ?c)))}) |
73 | 62 |
|
74 | 63 | (def env-with-sequence {:a (>leaf [1 2 3]) |
75 | 64 | :b (syntax/>sequence inc ?a)}) |
|
254 | 243 | (testing "channel-leaf" |
255 | 244 | (is (= 7 (applicative/eval-key env+channel-leaf :c {::applicative/context core-async/context})))))) |
256 | 245 |
|
257 | | -(deftest core-async-blocking-eval-test |
258 | | - (testing "eval of a blocking tagged node will happen in the `async-thread-macro` worker pool" |
259 | | - (is (match? #"async-thread-macro-\d+" |
260 | | - (applicative/eval-key env-with-blocking-tag :b-name {::applicative/context core-async/context}))) |
261 | | - (is (match? #"async-dispatch-\d+" |
262 | | - (applicative/eval-key env-with-blocking-tag :a-name {::applicative/context core-async/context}))))) |
263 | | - |
264 | | -(deftest thread-sleeping-test-proves-thread-works-how-we-expect |
265 | | - (testing "actually only 8 threads in the async dispatch worker pool" |
266 | | - (is (match? 8 |
267 | | - @@#'clojure.core.async.impl.exec.threadpool/pool-size))) |
268 | | - (testing "when we have 9 1-second blocking nodes in one environment, it can run in fewer than 2 seconds" |
269 | | - (testing "async version runs parallel when option is neglected" |
270 | | - (let [[nanosec-async _] (criterium/time-body (applicative/eval-key env-with-nine-sleeps :z {::applicative/context core-async/context}))] |
271 | | - (is (match? (matchers/within-delta 1000000000 1000000000) |
272 | | - nanosec-async)))))) |
| 246 | +(deftest blocking-tagging-test |
| 247 | + (testing "about the blocking tag" |
| 248 | + (testing "go-checking system property is set" |
| 249 | + (is (Boolean/getBoolean "clojure.core.async.go-checking"))) |
| 250 | + (testing "eval of an env with blocking ops in dispatch worker pool provokes exceptions, evaling same ops in threads doesn't" |
| 251 | + (is (match? [true java.lang.IllegalStateException] |
| 252 | + (update (applicative/eval-key env-with-blocking-take :d {::applicative/context core-async/context}) |
| 253 | + 1 |
| 254 | + class)))))) |
273 | 255 |
|
274 | 256 | (defspec does-not-blow-up-spec |
275 | 257 | (prop/for-all [env (fixtures/env-gen {})] |
|
0 commit comments