@@ -839,6 +839,34 @@ static int janet_chanat_gc(void *p, size_t s) {
839839 return 0 ;
840840}
841841
842+ static void janet_chanat_remove_vmref (JanetQueue * fq ) {
843+ JanetChannelPending * pending = fq -> data ;
844+ if (fq -> head <= fq -> tail ) {
845+ for (int32_t i = fq -> head ; i < fq -> tail ; i ++ ) {
846+ if (pending [i ].thread == & janet_vm ) pending [i ].thread = NULL ;
847+ }
848+ } else {
849+ for (int32_t i = fq -> head ; i < fq -> capacity ; i ++ ) {
850+ if (pending [i ].thread == & janet_vm ) pending [i ].thread = NULL ;
851+ }
852+ for (int32_t i = 0 ; i < fq -> tail ; i ++ ) {
853+ if (pending [i ].thread == & janet_vm ) pending [i ].thread = NULL ;
854+ }
855+ }
856+ }
857+
858+ static int janet_chanat_gcperthread (void * p , size_t s ) {
859+ (void ) s ;
860+ JanetChannel * chan = p ;
861+ janet_chan_lock (chan );
862+ /* Make sure that the internals of the threaded channel no longer reference _this_ thread. Replace
863+ * those references with NULL. */
864+ janet_chanat_remove_vmref (& chan -> read_pending );
865+ janet_chanat_remove_vmref (& chan -> write_pending );
866+ janet_chan_unlock (chan );
867+ return 0 ;
868+ }
869+
842870static void janet_chanat_mark_fq (JanetQueue * fq ) {
843871 JanetChannelPending * pending = fq -> data ;
844872 if (fq -> head <= fq -> tail ) {
@@ -921,27 +949,31 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
921949 int is_read = (mode == JANET_CP_MODE_CHOICE_READ ) || (mode == JANET_CP_MODE_READ );
922950 if (is_read ) {
923951 JanetChannelPending reader ;
924- if (!janet_q_pop (& channel -> read_pending , & reader , sizeof (reader ))) {
952+ while (!janet_q_pop (& channel -> read_pending , & reader , sizeof (reader ))) {
925953 JanetVM * vm = reader .thread ;
954+ if (!vm ) continue ;
926955 JanetEVGenericMessage msg ;
927956 msg .tag = reader .mode ;
928957 msg .fiber = reader .fiber ;
929958 msg .argi = (int32_t ) reader .sched_id ;
930959 msg .argp = channel ;
931960 msg .argj = x ;
932961 janet_ev_post_event (vm , janet_thread_chan_cb , msg );
962+ break ;
933963 }
934964 } else {
935965 JanetChannelPending writer ;
936- if (!janet_q_pop (& channel -> write_pending , & writer , sizeof (writer ))) {
966+ while (!janet_q_pop (& channel -> write_pending , & writer , sizeof (writer ))) {
937967 JanetVM * vm = writer .thread ;
968+ if (!vm ) continue ;
938969 JanetEVGenericMessage msg ;
939970 msg .tag = writer .mode ;
940971 msg .fiber = writer .fiber ;
941972 msg .argi = (int32_t ) writer .sched_id ;
942973 msg .argp = channel ;
943974 msg .argj = janet_wrap_nil ();
944975 janet_ev_post_event (vm , janet_thread_chan_cb , msg );
976+ break ;
945977 }
946978 }
947979 }
@@ -1005,7 +1037,9 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode
10051037 msg .argi = (int32_t ) reader .sched_id ;
10061038 msg .argp = channel ;
10071039 msg .argj = x ;
1008- janet_ev_post_event (vm , janet_thread_chan_cb , msg );
1040+ if (vm ) {
1041+ janet_ev_post_event (vm , janet_thread_chan_cb , msg );
1042+ }
10091043 } else {
10101044 if (reader .mode == JANET_CP_MODE_CHOICE_READ ) {
10111045 janet_schedule (reader .fiber , make_read_result (channel , x ));
@@ -1060,7 +1094,9 @@ static int janet_channel_pop_with_lock(JanetChannel *channel, Janet *item, int i
10601094 msg .argi = (int32_t ) writer .sched_id ;
10611095 msg .argp = channel ;
10621096 msg .argj = janet_wrap_nil ();
1063- janet_ev_post_event (vm , janet_thread_chan_cb , msg );
1097+ if (vm ) {
1098+ janet_ev_post_event (vm , janet_thread_chan_cb , msg );
1099+ }
10641100 } else {
10651101 if (writer .mode == JANET_CP_MODE_CHOICE_WRITE ) {
10661102 janet_schedule (writer .fiber , make_write_result (channel ));
@@ -1324,7 +1360,9 @@ JANET_CORE_FN(cfun_channel_close,
13241360 msg .tag = JANET_CP_MODE_CLOSE ;
13251361 msg .argi = (int32_t ) writer .sched_id ;
13261362 msg .argj = janet_wrap_nil ();
1327- janet_ev_post_event (vm , janet_thread_chan_cb , msg );
1363+ if (vm ) {
1364+ janet_ev_post_event (vm , janet_thread_chan_cb , msg );
1365+ }
13281366 } else {
13291367 if (janet_fiber_can_resume (writer .fiber )) {
13301368 if (writer .mode == JANET_CP_MODE_CHOICE_WRITE ) {
@@ -1345,7 +1383,9 @@ JANET_CORE_FN(cfun_channel_close,
13451383 msg .tag = JANET_CP_MODE_CLOSE ;
13461384 msg .argi = (int32_t ) reader .sched_id ;
13471385 msg .argj = janet_wrap_nil ();
1348- janet_ev_post_event (vm , janet_thread_chan_cb , msg );
1386+ if (vm ) {
1387+ janet_ev_post_event (vm , janet_thread_chan_cb , msg );
1388+ }
13491389 } else {
13501390 if (janet_fiber_can_resume (reader .fiber )) {
13511391 if (reader .mode == JANET_CP_MODE_CHOICE_READ ) {
@@ -1438,7 +1478,10 @@ const JanetAbstractType janet_channel_type = {
14381478 NULL , /* compare */
14391479 NULL , /* hash */
14401480 janet_chanat_next ,
1441- JANET_ATEND_NEXT
1481+ NULL , /* call */
1482+ NULL , /* length */
1483+ NULL , /* bytes */
1484+ janet_chanat_gcperthread
14421485};
14431486
14441487/* Main event loop */
0 commit comments